diff options
author | phajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-11-20 18:03:22 +0000 |
---|---|---|
committer | phajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-11-20 18:03:22 +0000 |
commit | e5fea266fccf7dc2407d591c2a0420dca9c119b0 (patch) | |
tree | 466021b3b4288586d3e32ac251afb7ce67c0ff33 /tools/sharding_supervisor | |
parent | 5f716a97d69a5f1fbc1ad3282bce1969db2eb037 (diff) | |
download | chromium_src-e5fea266fccf7dc2407d591c2a0420dca9c119b0.zip chromium_src-e5fea266fccf7dc2407d591c2a0420dca9c119b0.tar.gz chromium_src-e5fea266fccf7dc2407d591c2a0420dca9c119b0.tar.bz2 |
GTTF: remove parts of sharding_supervisor that might be hang-prone.
BUG=none
TEST=none
Review URL: https://codereview.chromium.org/11367047
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@168836 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools/sharding_supervisor')
-rwxr-xr-x | tools/sharding_supervisor/sharding_supervisor.py | 334 | ||||
-rw-r--r-- | tools/sharding_supervisor/stdio_buffer.py | 60 |
2 files changed, 56 insertions, 338 deletions
diff --git a/tools/sharding_supervisor/sharding_supervisor.py b/tools/sharding_supervisor/sharding_supervisor.py index 89d3ec5..f0592ca 100755 --- a/tools/sharding_supervisor/sharding_supervisor.py +++ b/tools/sharding_supervisor/sharding_supervisor.py @@ -18,26 +18,17 @@ import cStringIO import itertools import optparse import os -import Queue import random import re +import subprocess import sys import threading -from stdio_buffer import StdioBuffer from xml.dom import minidom # Add tools/ to path BASE_PATH = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.join(BASE_PATH, "..")) -try: - import find_depot_tools - # Fixes a bug in Windows where some shards die upon starting - # TODO(charleslee): actually fix this bug - import subprocess2 as subprocess -except ImportError: - # Unable to find depot_tools, so just use standard subprocess - import subprocess SS_USAGE = "python %prog [options] path/to/test [gtest_args]" SS_DEFAULT_NUM_CORES = 4 @@ -186,6 +177,7 @@ def RunShard(test, total_shards, index, gtest_args, stdout, stderr): return subprocess.Popen( args, stdout=stdout, stderr=stderr, + shell=False, env=env, bufsize=0, universal_newlines=True) @@ -196,86 +188,28 @@ class ShardRunner(threading.Thread): Attributes: supervisor: The ShardingSupervisor that this worker reports to. - counter: Called to get the next shard index to run. - test_start: Regex that detects when a test runs. - test_ok: Regex that detects a passing test. - test_fail: Regex that detects a failing test. - current_test: The name of the currently running test. + shard_index: Shard index to run. """ - def __init__(self, supervisor, counter, test_start, test_ok, test_fail): - """Inits ShardRunner and sets the current test to nothing.""" + def __init__(self, supervisor, shard_index): + """Inits ShardRunner.""" threading.Thread.__init__(self) self.supervisor = supervisor - self.counter = counter - self.test_start = test_start - self.test_ok = test_ok - self.test_fail = test_fail - self.current_test = "" - - def ReportFailure(self, description, index, test_name): - """Assembles and reports a failure line to be printed later.""" - log_line = "%s (%i): %s\n" % (description, index, test_name) - self.supervisor.LogTestFailure(log_line) - - def ProcessLine(self, index, line): - """Checks a shard output line for test status, and reports a failure or - incomplete test if needed. - """ - results = self.test_start.search(line) - if results: - if self.current_test: - self.ReportFailure("INCOMPLETE", index, self.current_test) - self.current_test = results.group(1) - self.supervisor.IncrementTestCount() - return - - results = self.test_ok.search(line) - if results: - self.current_test = "" - return - - results = self.test_fail.search(line) - if results: - self.ReportFailure("FAILED", index, results.group(1)) - self.current_test = "" + self.shard_index = shard_index - def run(self): - """Runs shards and outputs the results. + self.stdout = '' + self.shard = RunShard( + self.supervisor.test, self.supervisor.total_shards, self.shard_index, + self.supervisor.gtest_args, subprocess.PIPE, subprocess.STDOUT) - Gets the next shard index from the supervisor, runs it in a subprocess, - and collects the output. The output is read character by character in - case the shard crashes without an ending newline. Each line is processed - as it is finished. - """ - while True: - try: - index = self.counter.get_nowait() - except Queue.Empty: - break - shard_running = True - shard = RunShard( - self.supervisor.test, self.supervisor.total_shards, index, - self.supervisor.gtest_args, subprocess.PIPE, subprocess.PIPE) - buffer = StdioBuffer(shard) - # Spawn two threads to collect stdio output - stdout_collector_thread = buffer.handle_pipe(sys.stdout, shard.stdout) - stderr_collector_thread = buffer.handle_pipe(sys.stderr, shard.stderr) - while shard_running: - pipe, line = buffer.readline() - if pipe is None and line is None: - shard_running = False - if not line and not shard_running: - break - self.ProcessLine(index, line) - self.supervisor.LogOutputLine(index, line, pipe) - stdout_collector_thread.join() - stderr_collector_thread.join() - if self.current_test: - self.ReportFailure("INCOMPLETE", index, self.current_test) - self.supervisor.ShardIndexCompleted(index) - if shard.returncode != 0: - self.supervisor.LogShardFailure(index) + # Do not block program exit in case we have to leak the thread. + self.daemon = True + self.flag = threading.Event() + + def run(self): + """Runs the shard and outputs the results.""" + self.stdout, _ = self.shard.communicate() + self.flag.set() class ShardingSupervisor(object): @@ -286,15 +220,9 @@ class ShardingSupervisor(object): num_shards_to_run: Total number of shards to split the test into. num_runs: Total number of worker threads to create for running shards. color: Indicates which coloring mode to use in the output. - original_order: True if shard output should be printed as it comes. - prefix: True if each line should indicate the shard index. retry_percent: Integer specifying the max percent of tests to retry. gtest_args: The options to pass to gtest. - failed_tests: List of statements from shard output indicating a failure. failed_shards: List of shards that contained failing tests. - shards_completed: List of flags indicating which shards have finished. - shard_output: Buffer that stores output from each shard as (stdio, line). - test_counter: Stores the total number of tests run. total_slaves: Total number of slaves running this test. slave_index: Current slave to run tests for. @@ -312,9 +240,8 @@ class ShardingSupervisor(object): SHARD_COMPLETED = object() - def __init__(self, test, num_shards_to_run, num_runs, color, original_order, - prefix, retry_percent, timeout, total_slaves, slave_index, - gtest_args): + def __init__(self, test, num_shards_to_run, num_runs, color, + retry_percent, timeout, total_slaves, slave_index, gtest_args): """Inits ShardingSupervisor with given options and gtest arguments.""" self.test = test # Number of shards to run locally. @@ -324,64 +251,54 @@ class ShardingSupervisor(object): self.slave_index = slave_index self.num_runs = num_runs self.color = color - self.original_order = original_order - self.prefix = prefix self.retry_percent = retry_percent self.timeout = timeout self.gtest_args = gtest_args - self.failed_tests = [] self.failed_shards = [] - self.shards_completed = [False] * self.num_shards_to_run - self.shard_output = [Queue.Queue() for _ in range(self.num_shards_to_run)] - self.test_counter = itertools.count() def ShardTest(self): """Runs the test and manages the worker threads. Runs the test and outputs a summary at the end. All the tests in the suite are run by creating (cores * runs_per_core) threads and - (cores * shards_per_core) shards. When all the worker threads have - finished, the lines saved in failed_tests are printed again. If enabled, - and failed tests that do not have FLAKY or FAILS in their names are run - again, serially, and the results are printed. + (cores * shards_per_core) shards. Returns: - 1 if some unexpected (not FLAKY or FAILS) tests failed, 0 otherwise. + 1 on failure, 0 otherwise. """ - # Regular expressions for parsing GTest logs. Test names look like - # SomeTestCase.SomeTest - # SomeName/SomeTestCase.SomeTest/1 - # This regex also matches SomeName.SomeTest/1 and - # SomeName/SomeTestCase.SomeTest, which should be harmless. - test_name_regex = r"((\w+/)?\w+\.\w+(/\d+)?)" - - # Regex for filtering out ANSI escape codes when using color. - ansi_regex = r"(?:\x1b\[.*?[a-zA-Z])?" - - test_start = re.compile( - ansi_regex + r"\[\s+RUN\s+\] " + ansi_regex + test_name_regex) - test_ok = re.compile( - ansi_regex + r"\[\s+OK\s+\] " + ansi_regex + test_name_regex) - test_fail = re.compile( - ansi_regex + r"\[\s+FAILED\s+\] " + ansi_regex + test_name_regex) - workers = [] - counter = Queue.Queue() start_point = self.num_shards_to_run * self.slave_index for i in range(start_point, start_point + self.num_shards_to_run): - counter.put(i) - - for i in range(self.num_runs): - worker = ShardRunner( - self, counter, test_start, test_ok, test_fail) + worker = ShardRunner(self, i) worker.start() workers.append(worker) - if self.original_order: - for worker in workers: - worker.join() - else: - self.WaitForShards() + + for worker in workers: + worker.flag.wait(timeout=self.timeout) + if worker.is_alive(): + # Print something to prevent buildbot from killing us + # because of no output for long time. + print 'TIMED OUT WAITING FOR SHARD, KILLING' + sys.stdout.flush() + + # Make sure we count the timeout as a failure. + self.LogShardFailure(worker.shard_index) + + worker.shard.kill() + worker.flag.wait(timeout=self.timeout) + else: + if worker.shard.returncode != 0: + self.LogShardFailure(worker.shard_index) + + if worker.is_alive(): + print 'LEAKING THREAD, SORRY' + else: + # Note: print guarantees there will be a newline at the end. This helps + # preventing mixed-up logs. + print worker.stdout + + sys.stdout.flush() # All the shards are done. Merge all the XML files and generate the # main one. @@ -403,134 +320,16 @@ class ShardingSupervisor(object): self.WriteText(sys.stdout, "\nFAILED SHARDS: %s\n" % str(self.failed_shards), "\x1b[1;5;31m") + return 1 else: self.WriteText(sys.stdout, "\nALL SHARDS PASSED!\n", "\x1b[1;5;32m") - self.PrintSummary(self.failed_tests) - if self.retry_percent < 0: - return len(self.failed_shards) > 0 - - self.failed_tests = [x for x in self.failed_tests if x.find("FAILS_") < 0] - self.failed_tests = [x for x in self.failed_tests if x.find("FLAKY_") < 0] - if not self.failed_tests: return 0 - return self.RetryFailedTests() - - def LogTestFailure(self, line): - """Saves a line in the lsit of failed tests to be printed at the end.""" - if line not in self.failed_tests: - self.failed_tests.append(line) def LogShardFailure(self, index): """Records that a test in the given shard has failed.""" - self.failed_shards.append(index) - - def WaitForShards(self): - """Prints the output from each shard in consecutive order, waiting for - the current shard to finish before starting on the next shard. - """ - try: - for shard_index in range(self.num_shards_to_run): - while True: - try: - pipe, line = self.shard_output[shard_index].get(True, self.timeout) - except Queue.Empty: - # Shard timed out, notice failure and move on. - self.LogShardFailure(shard_index) - # TODO(maruel): Print last test. It'd be simpler to have the - # processing in the main thread. - # TODO(maruel): Make sure the worker thread terminates. - sys.stdout.write('TIMED OUT\n\n') - LogTestFailure( - 'FAILURE: SHARD %d TIMED OUT; %d seconds' % ( - shard_index, self.timeout)) - break - if line is self.SHARD_COMPLETED: - break - sys.stdout.write(line) - except: - sys.stdout.flush() - print 'CAUGHT EXCEPTION: dumping remaining data:' - for shard_index in range(self.num_shards_to_run): - while True: - try: - pipe, line = self.shard_output[shard_index].get(False) - except Queue.Empty: - # Shard timed out, notice failure and move on. - self.LogShardFailure(shard_index) - break - if line is self.SHARD_COMPLETED: - break - sys.stdout.write(line) - raise - - def LogOutputLine(self, index, line, pipe=sys.stdout): - """Either prints the shard output line immediately or saves it in the - output buffer, depending on the settings. Also optionally adds a prefix. - Adds a (sys.stdout, line) or (sys.stderr, line) tuple in the output queue. - """ - # Fix up the index. - array_index = index - (self.num_shards_to_run * self.slave_index) - if self.prefix: - line = "%i>%s" % (index, line) - if self.original_order: - pipe.write(line) - else: - self.shard_output[array_index].put((pipe, line)) - - def IncrementTestCount(self): - """Increments the number of tests run. This is relevant to the - --retry-percent option. - """ - self.test_counter.next() - - def ShardIndexCompleted(self, index): - """Records that a shard has finished so the output from the next shard - can now be printed. - """ - # Fix up the index. - array_index = index - (self.num_shards_to_run * self.slave_index) - self.shard_output[array_index].put((sys.stdout, self.SHARD_COMPLETED)) - - def RetryFailedTests(self): - """Reruns any failed tests serially and prints another summary of the - results if no more than retry_percent failed. - """ - num_tests_run = self.test_counter.next() - if len(self.failed_tests) > self.retry_percent * num_tests_run: - sys.stdout.write("\nNOT RETRYING FAILED TESTS (too many failed)\n") - return 1 - self.WriteText(sys.stdout, "\nRETRYING FAILED TESTS:\n", "\x1b[1;5;33m") - sharded_description = re.compile(r": (?:\d+>)?(.*)") - gtest_filters = [sharded_description.search(line).group(1) - for line in self.failed_tests] - failed_retries = [] - - for test_filter in gtest_filters: - args = [self.test, "--gtest_filter=" + test_filter] - # Don't update the xml output files during retry. - stripped_gtests_args = RemoveGTestOutput(self.gtest_args) - args.extend(stripped_gtests_args) - rerun = subprocess.Popen(args) - rerun.wait() - if rerun.returncode != 0: - failed_retries.append(test_filter) - - self.WriteText(sys.stdout, "RETRY RESULTS:\n", "\x1b[1;5;33m") - self.PrintSummary(failed_retries) - return len(failed_retries) > 0 - - def PrintSummary(self, failed_tests): - """Prints a summary of the test results. - - If any shards had failing tests, the list is sorted and printed. Then all - the lines that indicate a test failure are reproduced. - """ - if failed_tests: - self.WriteText(sys.stdout, "FAILED TESTS:\n", "\x1b[1;5;31m") - for line in failed_tests: - sys.stdout.write(line) - else: - self.WriteText(sys.stdout, "ALL TESTS PASSED!\n", "\x1b[1;5;32m") + # There might be some duplicates in case a shard times out. + if index not in self.failed_shards: + self.failed_shards.append(index) def WriteText(self, pipe, text, ansi): """Writes the text to the pipe with the ansi escape code, if colored @@ -562,19 +361,6 @@ def main(): parser.add_option( "-s", "--runshard", type="int", help="single shard index to run") parser.add_option( - "--reorder", action="store_true", - help="ensure that all output from an earlier shard is printed before" - " output from a later shard") - # TODO(charleslee): for backwards compatibility with master.cfg file - parser.add_option( - "--original-order", action="store_true", - help="print shard output in its orginal jumbled order of execution" - " (useful for debugging flaky tests)") - parser.add_option( - "--prefix", action="store_true", - help="prefix each line of shard output with 'N>', where N is the shard" - " index (forced True when --original-order is True)") - parser.add_option( "--random-seed", action="store_true", help="shuffle the tests with a random seed value") parser.add_option( @@ -617,14 +403,6 @@ def main(): gtest_args = ["--gtest_color=%s" % { True: "yes", False: "no"}[options.color]] + args[1:] - if options.original_order: - options.prefix = True - - # TODO(charleslee): for backwards compatibility with buildbot's log_parser - if options.reorder: - options.original_order = False - options.prefix = True - if options.random_seed: seed = random.randint(1, 99999) gtest_args.extend(["--gtest_shuffle", "--gtest_random_seed=%i" % seed]) @@ -641,7 +419,7 @@ def main(): parser.error("Invalid shard number given parameters!") shard = RunShard( test, num_shards_to_run, options.runshard, gtest_args, None, None) - shard.communicate() + shard.communicate(timeout=self.timeout) return shard.poll() # When running browser_tests, load the test binary into memory before running @@ -661,8 +439,8 @@ def main(): # shard and run the whole test ss = ShardingSupervisor( test, num_shards_to_run, num_runs, options.color, - options.original_order, options.prefix, options.retry_percent, - options.timeout, options.total_slaves, options.slave_index, gtest_args) + options.retry_percent, options.timeout, options.total_slaves, + options.slave_index, gtest_args) return ss.ShardTest() diff --git a/tools/sharding_supervisor/stdio_buffer.py b/tools/sharding_supervisor/stdio_buffer.py deleted file mode 100644 index 737374d..0000000 --- a/tools/sharding_supervisor/stdio_buffer.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright (c) 2012 The Chromium Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. -"""Syncronized Standard IO Linebuffer implemented with cStringIO.""" - -import cStringIO -import os -import sys -import threading -import Queue - - -class StdioBuffer(object): - def __init__(self, shard): - self.line_ready_event = threading.Event() - self.queue = Queue.Queue() - self.lock = threading.Lock() - self.completed = 0 - self.shard = shard - - def _pipe_handler(self, system_pipe, program_pipe): - """Helper method for collecting stdio output. Output is collected until - a newline is seen, at which point an event is triggered and the line is - pushed to a buffer as a (stdio, line) tuple.""" - buffer = cStringIO.StringIO() - pipe_running = True - while pipe_running: - char = program_pipe.read(1) - if not char and self.shard.poll() is not None: - pipe_running = False - self.line_ready_event.set() - buffer.write(char) - if char == '\n' or not pipe_running: - line = buffer.getvalue() - if not line and not pipe_running: - with self.lock: - self.completed += 1 - self.line_ready_event.set() - break - self.queue.put((system_pipe, line)) - self.line_ready_event.set() - buffer.close() - buffer = cStringIO.StringIO() - - def handle_pipe(self, system_pipe, program_pipe): - t = threading.Thread(target=self._pipe_handler, args=[system_pipe, - program_pipe]) - t.start() - return t - - def readline(self): - """Emits a tuple of (sys.stderr, line) or (sys.stdout, line). This is a - blocking call.""" - while self.completed < 2 and self.queue.empty(): - self.line_ready_event.wait() - self.line_ready_event.clear() - if not self.queue.empty(): - return self.queue.get_nowait() - else: - return (None, None)
\ No newline at end of file |