summaryrefslogtreecommitdiffstats
path: root/tools/sharding_supervisor
diff options
context:
space:
mode:
authorphajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-11-20 18:03:22 +0000
committerphajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-11-20 18:03:22 +0000
commite5fea266fccf7dc2407d591c2a0420dca9c119b0 (patch)
tree466021b3b4288586d3e32ac251afb7ce67c0ff33 /tools/sharding_supervisor
parent5f716a97d69a5f1fbc1ad3282bce1969db2eb037 (diff)
downloadchromium_src-e5fea266fccf7dc2407d591c2a0420dca9c119b0.zip
chromium_src-e5fea266fccf7dc2407d591c2a0420dca9c119b0.tar.gz
chromium_src-e5fea266fccf7dc2407d591c2a0420dca9c119b0.tar.bz2
GTTF: remove parts of sharding_supervisor that might be hang-prone.
BUG=none TEST=none Review URL: https://codereview.chromium.org/11367047 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@168836 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools/sharding_supervisor')
-rwxr-xr-xtools/sharding_supervisor/sharding_supervisor.py334
-rw-r--r--tools/sharding_supervisor/stdio_buffer.py60
2 files changed, 56 insertions, 338 deletions
diff --git a/tools/sharding_supervisor/sharding_supervisor.py b/tools/sharding_supervisor/sharding_supervisor.py
index 89d3ec5..f0592ca 100755
--- a/tools/sharding_supervisor/sharding_supervisor.py
+++ b/tools/sharding_supervisor/sharding_supervisor.py
@@ -18,26 +18,17 @@ import cStringIO
import itertools
import optparse
import os
-import Queue
import random
import re
+import subprocess
import sys
import threading
-from stdio_buffer import StdioBuffer
from xml.dom import minidom
# Add tools/ to path
BASE_PATH = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(BASE_PATH, ".."))
-try:
- import find_depot_tools
- # Fixes a bug in Windows where some shards die upon starting
- # TODO(charleslee): actually fix this bug
- import subprocess2 as subprocess
-except ImportError:
- # Unable to find depot_tools, so just use standard subprocess
- import subprocess
SS_USAGE = "python %prog [options] path/to/test [gtest_args]"
SS_DEFAULT_NUM_CORES = 4
@@ -186,6 +177,7 @@ def RunShard(test, total_shards, index, gtest_args, stdout, stderr):
return subprocess.Popen(
args, stdout=stdout,
stderr=stderr,
+ shell=False,
env=env,
bufsize=0,
universal_newlines=True)
@@ -196,86 +188,28 @@ class ShardRunner(threading.Thread):
Attributes:
supervisor: The ShardingSupervisor that this worker reports to.
- counter: Called to get the next shard index to run.
- test_start: Regex that detects when a test runs.
- test_ok: Regex that detects a passing test.
- test_fail: Regex that detects a failing test.
- current_test: The name of the currently running test.
+ shard_index: Shard index to run.
"""
- def __init__(self, supervisor, counter, test_start, test_ok, test_fail):
- """Inits ShardRunner and sets the current test to nothing."""
+ def __init__(self, supervisor, shard_index):
+ """Inits ShardRunner."""
threading.Thread.__init__(self)
self.supervisor = supervisor
- self.counter = counter
- self.test_start = test_start
- self.test_ok = test_ok
- self.test_fail = test_fail
- self.current_test = ""
-
- def ReportFailure(self, description, index, test_name):
- """Assembles and reports a failure line to be printed later."""
- log_line = "%s (%i): %s\n" % (description, index, test_name)
- self.supervisor.LogTestFailure(log_line)
-
- def ProcessLine(self, index, line):
- """Checks a shard output line for test status, and reports a failure or
- incomplete test if needed.
- """
- results = self.test_start.search(line)
- if results:
- if self.current_test:
- self.ReportFailure("INCOMPLETE", index, self.current_test)
- self.current_test = results.group(1)
- self.supervisor.IncrementTestCount()
- return
-
- results = self.test_ok.search(line)
- if results:
- self.current_test = ""
- return
-
- results = self.test_fail.search(line)
- if results:
- self.ReportFailure("FAILED", index, results.group(1))
- self.current_test = ""
+ self.shard_index = shard_index
- def run(self):
- """Runs shards and outputs the results.
+ self.stdout = ''
+ self.shard = RunShard(
+ self.supervisor.test, self.supervisor.total_shards, self.shard_index,
+ self.supervisor.gtest_args, subprocess.PIPE, subprocess.STDOUT)
- Gets the next shard index from the supervisor, runs it in a subprocess,
- and collects the output. The output is read character by character in
- case the shard crashes without an ending newline. Each line is processed
- as it is finished.
- """
- while True:
- try:
- index = self.counter.get_nowait()
- except Queue.Empty:
- break
- shard_running = True
- shard = RunShard(
- self.supervisor.test, self.supervisor.total_shards, index,
- self.supervisor.gtest_args, subprocess.PIPE, subprocess.PIPE)
- buffer = StdioBuffer(shard)
- # Spawn two threads to collect stdio output
- stdout_collector_thread = buffer.handle_pipe(sys.stdout, shard.stdout)
- stderr_collector_thread = buffer.handle_pipe(sys.stderr, shard.stderr)
- while shard_running:
- pipe, line = buffer.readline()
- if pipe is None and line is None:
- shard_running = False
- if not line and not shard_running:
- break
- self.ProcessLine(index, line)
- self.supervisor.LogOutputLine(index, line, pipe)
- stdout_collector_thread.join()
- stderr_collector_thread.join()
- if self.current_test:
- self.ReportFailure("INCOMPLETE", index, self.current_test)
- self.supervisor.ShardIndexCompleted(index)
- if shard.returncode != 0:
- self.supervisor.LogShardFailure(index)
+ # Do not block program exit in case we have to leak the thread.
+ self.daemon = True
+ self.flag = threading.Event()
+
+ def run(self):
+ """Runs the shard and outputs the results."""
+ self.stdout, _ = self.shard.communicate()
+ self.flag.set()
class ShardingSupervisor(object):
@@ -286,15 +220,9 @@ class ShardingSupervisor(object):
num_shards_to_run: Total number of shards to split the test into.
num_runs: Total number of worker threads to create for running shards.
color: Indicates which coloring mode to use in the output.
- original_order: True if shard output should be printed as it comes.
- prefix: True if each line should indicate the shard index.
retry_percent: Integer specifying the max percent of tests to retry.
gtest_args: The options to pass to gtest.
- failed_tests: List of statements from shard output indicating a failure.
failed_shards: List of shards that contained failing tests.
- shards_completed: List of flags indicating which shards have finished.
- shard_output: Buffer that stores output from each shard as (stdio, line).
- test_counter: Stores the total number of tests run.
total_slaves: Total number of slaves running this test.
slave_index: Current slave to run tests for.
@@ -312,9 +240,8 @@ class ShardingSupervisor(object):
SHARD_COMPLETED = object()
- def __init__(self, test, num_shards_to_run, num_runs, color, original_order,
- prefix, retry_percent, timeout, total_slaves, slave_index,
- gtest_args):
+ def __init__(self, test, num_shards_to_run, num_runs, color,
+ retry_percent, timeout, total_slaves, slave_index, gtest_args):
"""Inits ShardingSupervisor with given options and gtest arguments."""
self.test = test
# Number of shards to run locally.
@@ -324,64 +251,54 @@ class ShardingSupervisor(object):
self.slave_index = slave_index
self.num_runs = num_runs
self.color = color
- self.original_order = original_order
- self.prefix = prefix
self.retry_percent = retry_percent
self.timeout = timeout
self.gtest_args = gtest_args
- self.failed_tests = []
self.failed_shards = []
- self.shards_completed = [False] * self.num_shards_to_run
- self.shard_output = [Queue.Queue() for _ in range(self.num_shards_to_run)]
- self.test_counter = itertools.count()
def ShardTest(self):
"""Runs the test and manages the worker threads.
Runs the test and outputs a summary at the end. All the tests in the
suite are run by creating (cores * runs_per_core) threads and
- (cores * shards_per_core) shards. When all the worker threads have
- finished, the lines saved in failed_tests are printed again. If enabled,
- and failed tests that do not have FLAKY or FAILS in their names are run
- again, serially, and the results are printed.
+ (cores * shards_per_core) shards.
Returns:
- 1 if some unexpected (not FLAKY or FAILS) tests failed, 0 otherwise.
+ 1 on failure, 0 otherwise.
"""
- # Regular expressions for parsing GTest logs. Test names look like
- # SomeTestCase.SomeTest
- # SomeName/SomeTestCase.SomeTest/1
- # This regex also matches SomeName.SomeTest/1 and
- # SomeName/SomeTestCase.SomeTest, which should be harmless.
- test_name_regex = r"((\w+/)?\w+\.\w+(/\d+)?)"
-
- # Regex for filtering out ANSI escape codes when using color.
- ansi_regex = r"(?:\x1b\[.*?[a-zA-Z])?"
-
- test_start = re.compile(
- ansi_regex + r"\[\s+RUN\s+\] " + ansi_regex + test_name_regex)
- test_ok = re.compile(
- ansi_regex + r"\[\s+OK\s+\] " + ansi_regex + test_name_regex)
- test_fail = re.compile(
- ansi_regex + r"\[\s+FAILED\s+\] " + ansi_regex + test_name_regex)
-
workers = []
- counter = Queue.Queue()
start_point = self.num_shards_to_run * self.slave_index
for i in range(start_point, start_point + self.num_shards_to_run):
- counter.put(i)
-
- for i in range(self.num_runs):
- worker = ShardRunner(
- self, counter, test_start, test_ok, test_fail)
+ worker = ShardRunner(self, i)
worker.start()
workers.append(worker)
- if self.original_order:
- for worker in workers:
- worker.join()
- else:
- self.WaitForShards()
+
+ for worker in workers:
+ worker.flag.wait(timeout=self.timeout)
+ if worker.is_alive():
+ # Print something to prevent buildbot from killing us
+ # because of no output for long time.
+ print 'TIMED OUT WAITING FOR SHARD, KILLING'
+ sys.stdout.flush()
+
+ # Make sure we count the timeout as a failure.
+ self.LogShardFailure(worker.shard_index)
+
+ worker.shard.kill()
+ worker.flag.wait(timeout=self.timeout)
+ else:
+ if worker.shard.returncode != 0:
+ self.LogShardFailure(worker.shard_index)
+
+ if worker.is_alive():
+ print 'LEAKING THREAD, SORRY'
+ else:
+ # Note: print guarantees there will be a newline at the end. This helps
+ # preventing mixed-up logs.
+ print worker.stdout
+
+ sys.stdout.flush()
# All the shards are done. Merge all the XML files and generate the
# main one.
@@ -403,134 +320,16 @@ class ShardingSupervisor(object):
self.WriteText(sys.stdout,
"\nFAILED SHARDS: %s\n" % str(self.failed_shards),
"\x1b[1;5;31m")
+ return 1
else:
self.WriteText(sys.stdout, "\nALL SHARDS PASSED!\n", "\x1b[1;5;32m")
- self.PrintSummary(self.failed_tests)
- if self.retry_percent < 0:
- return len(self.failed_shards) > 0
-
- self.failed_tests = [x for x in self.failed_tests if x.find("FAILS_") < 0]
- self.failed_tests = [x for x in self.failed_tests if x.find("FLAKY_") < 0]
- if not self.failed_tests:
return 0
- return self.RetryFailedTests()
-
- def LogTestFailure(self, line):
- """Saves a line in the lsit of failed tests to be printed at the end."""
- if line not in self.failed_tests:
- self.failed_tests.append(line)
def LogShardFailure(self, index):
"""Records that a test in the given shard has failed."""
- self.failed_shards.append(index)
-
- def WaitForShards(self):
- """Prints the output from each shard in consecutive order, waiting for
- the current shard to finish before starting on the next shard.
- """
- try:
- for shard_index in range(self.num_shards_to_run):
- while True:
- try:
- pipe, line = self.shard_output[shard_index].get(True, self.timeout)
- except Queue.Empty:
- # Shard timed out, notice failure and move on.
- self.LogShardFailure(shard_index)
- # TODO(maruel): Print last test. It'd be simpler to have the
- # processing in the main thread.
- # TODO(maruel): Make sure the worker thread terminates.
- sys.stdout.write('TIMED OUT\n\n')
- LogTestFailure(
- 'FAILURE: SHARD %d TIMED OUT; %d seconds' % (
- shard_index, self.timeout))
- break
- if line is self.SHARD_COMPLETED:
- break
- sys.stdout.write(line)
- except:
- sys.stdout.flush()
- print 'CAUGHT EXCEPTION: dumping remaining data:'
- for shard_index in range(self.num_shards_to_run):
- while True:
- try:
- pipe, line = self.shard_output[shard_index].get(False)
- except Queue.Empty:
- # Shard timed out, notice failure and move on.
- self.LogShardFailure(shard_index)
- break
- if line is self.SHARD_COMPLETED:
- break
- sys.stdout.write(line)
- raise
-
- def LogOutputLine(self, index, line, pipe=sys.stdout):
- """Either prints the shard output line immediately or saves it in the
- output buffer, depending on the settings. Also optionally adds a prefix.
- Adds a (sys.stdout, line) or (sys.stderr, line) tuple in the output queue.
- """
- # Fix up the index.
- array_index = index - (self.num_shards_to_run * self.slave_index)
- if self.prefix:
- line = "%i>%s" % (index, line)
- if self.original_order:
- pipe.write(line)
- else:
- self.shard_output[array_index].put((pipe, line))
-
- def IncrementTestCount(self):
- """Increments the number of tests run. This is relevant to the
- --retry-percent option.
- """
- self.test_counter.next()
-
- def ShardIndexCompleted(self, index):
- """Records that a shard has finished so the output from the next shard
- can now be printed.
- """
- # Fix up the index.
- array_index = index - (self.num_shards_to_run * self.slave_index)
- self.shard_output[array_index].put((sys.stdout, self.SHARD_COMPLETED))
-
- def RetryFailedTests(self):
- """Reruns any failed tests serially and prints another summary of the
- results if no more than retry_percent failed.
- """
- num_tests_run = self.test_counter.next()
- if len(self.failed_tests) > self.retry_percent * num_tests_run:
- sys.stdout.write("\nNOT RETRYING FAILED TESTS (too many failed)\n")
- return 1
- self.WriteText(sys.stdout, "\nRETRYING FAILED TESTS:\n", "\x1b[1;5;33m")
- sharded_description = re.compile(r": (?:\d+>)?(.*)")
- gtest_filters = [sharded_description.search(line).group(1)
- for line in self.failed_tests]
- failed_retries = []
-
- for test_filter in gtest_filters:
- args = [self.test, "--gtest_filter=" + test_filter]
- # Don't update the xml output files during retry.
- stripped_gtests_args = RemoveGTestOutput(self.gtest_args)
- args.extend(stripped_gtests_args)
- rerun = subprocess.Popen(args)
- rerun.wait()
- if rerun.returncode != 0:
- failed_retries.append(test_filter)
-
- self.WriteText(sys.stdout, "RETRY RESULTS:\n", "\x1b[1;5;33m")
- self.PrintSummary(failed_retries)
- return len(failed_retries) > 0
-
- def PrintSummary(self, failed_tests):
- """Prints a summary of the test results.
-
- If any shards had failing tests, the list is sorted and printed. Then all
- the lines that indicate a test failure are reproduced.
- """
- if failed_tests:
- self.WriteText(sys.stdout, "FAILED TESTS:\n", "\x1b[1;5;31m")
- for line in failed_tests:
- sys.stdout.write(line)
- else:
- self.WriteText(sys.stdout, "ALL TESTS PASSED!\n", "\x1b[1;5;32m")
+ # There might be some duplicates in case a shard times out.
+ if index not in self.failed_shards:
+ self.failed_shards.append(index)
def WriteText(self, pipe, text, ansi):
"""Writes the text to the pipe with the ansi escape code, if colored
@@ -562,19 +361,6 @@ def main():
parser.add_option(
"-s", "--runshard", type="int", help="single shard index to run")
parser.add_option(
- "--reorder", action="store_true",
- help="ensure that all output from an earlier shard is printed before"
- " output from a later shard")
- # TODO(charleslee): for backwards compatibility with master.cfg file
- parser.add_option(
- "--original-order", action="store_true",
- help="print shard output in its orginal jumbled order of execution"
- " (useful for debugging flaky tests)")
- parser.add_option(
- "--prefix", action="store_true",
- help="prefix each line of shard output with 'N>', where N is the shard"
- " index (forced True when --original-order is True)")
- parser.add_option(
"--random-seed", action="store_true",
help="shuffle the tests with a random seed value")
parser.add_option(
@@ -617,14 +403,6 @@ def main():
gtest_args = ["--gtest_color=%s" % {
True: "yes", False: "no"}[options.color]] + args[1:]
- if options.original_order:
- options.prefix = True
-
- # TODO(charleslee): for backwards compatibility with buildbot's log_parser
- if options.reorder:
- options.original_order = False
- options.prefix = True
-
if options.random_seed:
seed = random.randint(1, 99999)
gtest_args.extend(["--gtest_shuffle", "--gtest_random_seed=%i" % seed])
@@ -641,7 +419,7 @@ def main():
parser.error("Invalid shard number given parameters!")
shard = RunShard(
test, num_shards_to_run, options.runshard, gtest_args, None, None)
- shard.communicate()
+ shard.communicate(timeout=self.timeout)
return shard.poll()
# When running browser_tests, load the test binary into memory before running
@@ -661,8 +439,8 @@ def main():
# shard and run the whole test
ss = ShardingSupervisor(
test, num_shards_to_run, num_runs, options.color,
- options.original_order, options.prefix, options.retry_percent,
- options.timeout, options.total_slaves, options.slave_index, gtest_args)
+ options.retry_percent, options.timeout, options.total_slaves,
+ options.slave_index, gtest_args)
return ss.ShardTest()
diff --git a/tools/sharding_supervisor/stdio_buffer.py b/tools/sharding_supervisor/stdio_buffer.py
deleted file mode 100644
index 737374d..0000000
--- a/tools/sharding_supervisor/stdio_buffer.py
+++ /dev/null
@@ -1,60 +0,0 @@
-# Copyright (c) 2012 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-"""Syncronized Standard IO Linebuffer implemented with cStringIO."""
-
-import cStringIO
-import os
-import sys
-import threading
-import Queue
-
-
-class StdioBuffer(object):
- def __init__(self, shard):
- self.line_ready_event = threading.Event()
- self.queue = Queue.Queue()
- self.lock = threading.Lock()
- self.completed = 0
- self.shard = shard
-
- def _pipe_handler(self, system_pipe, program_pipe):
- """Helper method for collecting stdio output. Output is collected until
- a newline is seen, at which point an event is triggered and the line is
- pushed to a buffer as a (stdio, line) tuple."""
- buffer = cStringIO.StringIO()
- pipe_running = True
- while pipe_running:
- char = program_pipe.read(1)
- if not char and self.shard.poll() is not None:
- pipe_running = False
- self.line_ready_event.set()
- buffer.write(char)
- if char == '\n' or not pipe_running:
- line = buffer.getvalue()
- if not line and not pipe_running:
- with self.lock:
- self.completed += 1
- self.line_ready_event.set()
- break
- self.queue.put((system_pipe, line))
- self.line_ready_event.set()
- buffer.close()
- buffer = cStringIO.StringIO()
-
- def handle_pipe(self, system_pipe, program_pipe):
- t = threading.Thread(target=self._pipe_handler, args=[system_pipe,
- program_pipe])
- t.start()
- return t
-
- def readline(self):
- """Emits a tuple of (sys.stderr, line) or (sys.stdout, line). This is a
- blocking call."""
- while self.completed < 2 and self.queue.empty():
- self.line_ready_event.wait()
- self.line_ready_event.clear()
- if not self.queue.empty():
- return self.queue.get_nowait()
- else:
- return (None, None) \ No newline at end of file