summaryrefslogtreecommitdiffstats
path: root/tools/sharding_supervisor
diff options
context:
space:
mode:
authormaruel@chromium.org <maruel@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-11-18 15:24:09 +0000
committermaruel@chromium.org <maruel@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-11-18 15:24:09 +0000
commitb986432a3ae3c5619fe67f4644ce77d7a5aeb491 (patch)
tree1cc85335f20d7f4fdd59455d4ae3b54b46646e40 /tools/sharding_supervisor
parent5c809a3fe4a3f638c2ff78e18f1a1cc33a807e85 (diff)
downloadchromium_src-b986432a3ae3c5619fe67f4644ce77d7a5aeb491.zip
chromium_src-b986432a3ae3c5619fe67f4644ce77d7a5aeb491.tar.gz
chromium_src-b986432a3ae3c5619fe67f4644ce77d7a5aeb491.tar.bz2
Revert r168478 "Switch over to run_test_cases.py"
The commit was done only to see how much it'd blow up on the CI. TBR=cmp@chromium.org,phajdan.jr@chromium.org BUG= Review URL: https://codereview.chromium.org/11280051 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@168479 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools/sharding_supervisor')
-rw-r--r--tools/sharding_supervisor/PRESUBMIT.py24
-rw-r--r--tools/sharding_supervisor/data/gtest_results.xml023
-rw-r--r--tools/sharding_supervisor/data/gtest_results.xml119
-rw-r--r--tools/sharding_supervisor/data/gtest_results_expected.xml22
-rwxr-xr-xtools/sharding_supervisor/dummy_test.py12
-rwxr-xr-xtools/sharding_supervisor/sharding_supervisor.py690
-rwxr-xr-xtools/sharding_supervisor/sharding_supervisor_unittest.py129
-rw-r--r--tools/sharding_supervisor/stdio_buffer.py60
8 files changed, 912 insertions, 67 deletions
diff --git a/tools/sharding_supervisor/PRESUBMIT.py b/tools/sharding_supervisor/PRESUBMIT.py
deleted file mode 100644
index 2e12d77..0000000
--- a/tools/sharding_supervisor/PRESUBMIT.py
+++ /dev/null
@@ -1,24 +0,0 @@
-# Copyright (c) 2012 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Top-level presubmit script for sharding_supervisor.
-
-See http://dev.chromium.org/developers/how-tos/depottools/presubmit-scripts for
-details on the presubmit API built into gcl.
-"""
-
-def CommonChecks(input_api, output_api):
- output = []
- output.extend(input_api.canned_checks.RunPylint(input_api, output_api))
- return output
-
-
-def CheckChangeOnUpload(input_api, output_api):
- return CommonChecks(input_api, output_api)
-
-
-def CheckChangeOnCommit(input_api, output_api):
- output = CommonChecks(input_api, output_api)
- output.extend(input_api.canned_checks.PanProjectChecks(input_api, output_api))
- return output
diff --git a/tools/sharding_supervisor/data/gtest_results.xml0 b/tools/sharding_supervisor/data/gtest_results.xml0
new file mode 100644
index 0000000..f6bf83a
--- /dev/null
+++ b/tools/sharding_supervisor/data/gtest_results.xml0
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<testsuites name="AllTests" tests="" failures="" disabled="" errors="" time="">
+ <!-- Suite that is run entirely on shard 0 -->
+ <testsuite name="Suite0" tests="1" failures="" disabled="" errors="" time="">
+ <testcase name="Test0" status="run" time="0" classname="Suite0" />
+ </testsuite>
+
+ <!-- Suite that is run entirely on shard 1 -->
+ <testsuite name="Suite1" tests="1" failures="" disabled="" errors="" time="">
+ </testsuite>
+
+ <!-- Suite that has tests run on both shard 0 and shard 1 -->
+ <testsuite name="Suite2" tests="2" failures="" disabled="" errors="" time="">
+ <testcase name="Test0" status="run" time="0" classname="Suite2" />
+ </testsuite>
+
+ <!-- Suite that has a test run on both shard 0 and shard 1 -->
+ <testsuite name="Suite3" tests="1" failures="" disabled="" errors="" time="">
+ <testcase name="Test0" status="run" time="0" classname="Suite3">
+ <failure message="" type="" ignored="true"></failure>
+ </testcase>
+ </testsuite>
+</testsuites>
diff --git a/tools/sharding_supervisor/data/gtest_results.xml1 b/tools/sharding_supervisor/data/gtest_results.xml1
new file mode 100644
index 0000000..6d8bef8
--- /dev/null
+++ b/tools/sharding_supervisor/data/gtest_results.xml1
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<testsuites name="AllTests" tests="" failures="" disabled="" errors="" time="">
+ <!-- See comments in gtest_results.xml0 for what the different suites represent -->
+ <testsuite name="Suite0" tests="1" failures="" disabled="" errors="" time="">
+ </testsuite>
+ <testsuite name="Suite1" tests="1" failures="" disabled="" errors="" time="">
+ <testcase name="Test0" status="run" time="1" classname="Suite1">
+ <failure message="" type="" ignored="true"></failure>
+ </testcase>
+ </testsuite>
+ <testsuite name="Suite2" tests="2" failures="" disabled="" errors="" time="">
+ <testcase name="Test1" status="run" time="0" classname="Suite2" />
+ </testsuite>
+ <testsuite name="Suite3" tests="1" failures="" disabled="" errors="" time="">
+ <testcase name="Test0" status="run" time="0" classname="Suite3">
+ <failure message="" type="" ignored="true"></failure>
+ </testcase>
+ </testsuite>
+</testsuites>
diff --git a/tools/sharding_supervisor/data/gtest_results_expected.xml b/tools/sharding_supervisor/data/gtest_results_expected.xml
new file mode 100644
index 0000000..7af04d4
--- /dev/null
+++ b/tools/sharding_supervisor/data/gtest_results_expected.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<testsuites name="AllTests" tests="" failures="" disabled="" errors="" time="">
+ <!-- See comments in gtest_results.xml0 for what the different suites represent -->
+ <testsuite name="Suite0" tests="1" failures="" disabled="" errors="" time="">
+ <testcase name="Test0" status="run" time="0" classname="Suite0" />
+ </testsuite>
+ <testsuite name="Suite1" tests="1" failures="" disabled="" errors="" time="">
+ <testcase name="Test0" status="run" time="1" classname="Suite1">
+ <failure message="" type="" ignored="true"></failure>
+ </testcase>
+ </testsuite>
+ <testsuite name="Suite2" tests="2" failures="" disabled="" errors="" time="">
+ <testcase name="Test0" status="run" time="0" classname="Suite2" />
+ <testcase name="Test1" status="run" time="0" classname="Suite2" />
+ </testsuite>
+ <testsuite name="Suite3" tests="1" failures="" disabled="" errors="" time="">
+ <testcase name="Test0" status="run" time="0" classname="Suite3">
+ <failure message="" type="" ignored="true"></failure>
+ <failure message="" type="" ignored="true"></failure>
+ </testcase>
+ </testsuite>
+</testsuites>
diff --git a/tools/sharding_supervisor/dummy_test.py b/tools/sharding_supervisor/dummy_test.py
new file mode 100755
index 0000000..4b1d019
--- /dev/null
+++ b/tools/sharding_supervisor/dummy_test.py
@@ -0,0 +1,12 @@
+#!/usr/bin/env python
+# Copyright (c) 2012 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Dummy test used by the sharding supervisor unittests."""
+
+import os
+
+total = os.environ['GTEST_TOTAL_SHARDS']
+index = os.environ['GTEST_SHARD_INDEX']
+print 'Running shard %s of %s' % (index, total)
diff --git a/tools/sharding_supervisor/sharding_supervisor.py b/tools/sharding_supervisor/sharding_supervisor.py
index 6414a37..89d3ec5 100755
--- a/tools/sharding_supervisor/sharding_supervisor.py
+++ b/tools/sharding_supervisor/sharding_supervisor.py
@@ -3,64 +3,668 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-"""Defer to run_test_cases.py."""
+"""Shards a given test suite and runs the shards in parallel.
-import os
+ShardingSupervisor is called to process the command line options and creates
+the specified number of worker threads. These threads then run each shard of
+the test in a separate process and report on the results. When all the shards
+have been completed, the supervisor reprints any lines indicating a test
+failure for convenience. If only one shard is to be run, a single subprocess
+is started for that shard and the output is identical to gtest's output.
+"""
+
+
+import cStringIO
+import itertools
import optparse
+import os
+import Queue
+import random
+import re
import sys
+import threading
+
+from stdio_buffer import StdioBuffer
+from xml.dom import minidom
+
+# Add tools/ to path
+BASE_PATH = os.path.dirname(os.path.abspath(__file__))
+sys.path.append(os.path.join(BASE_PATH, ".."))
+try:
+ import find_depot_tools
+ # Fixes a bug in Windows where some shards die upon starting
+ # TODO(charleslee): actually fix this bug
+ import subprocess2 as subprocess
+except ImportError:
+ # Unable to find depot_tools, so just use standard subprocess
+ import subprocess
+
+SS_USAGE = "python %prog [options] path/to/test [gtest_args]"
+SS_DEFAULT_NUM_CORES = 4
+SS_DEFAULT_SHARDS_PER_CORE = 5 # num_shards = cores * SHARDS_PER_CORE
+SS_DEFAULT_RUNS_PER_CORE = 1 # num_workers = cores * RUNS_PER_CORE
+SS_DEFAULT_RETRY_PERCENT = 5 # --retry-failed ignored if more than 5% fail
+SS_DEFAULT_TIMEOUT = 530 # Slightly less than buildbot's default 600 seconds
+
+
+def DetectNumCores():
+ """Detects the number of cores on the machine.
+
+ Returns:
+ The number of cores on the machine or DEFAULT_NUM_CORES if it could not
+ be found.
+ """
+ try:
+ # Override on some Chromium Valgrind bots.
+ if "CHROME_VALGRIND_NUMCPUS" in os.environ:
+ return int(os.environ["CHROME_VALGRIND_NUMCPUS"])
+ # Linux, Unix, MacOS
+ if hasattr(os, "sysconf"):
+ if "SC_NPROCESSORS_ONLN" in os.sysconf_names:
+ # Linux, Unix
+ return int(os.sysconf("SC_NPROCESSORS_ONLN"))
+ else:
+ # OSX
+ return int(os.popen2("sysctl -n hw.ncpu")[1].read())
+ # Windows
+ return int(os.environ["NUMBER_OF_PROCESSORS"])
+ except ValueError:
+ return SS_DEFAULT_NUM_CORES
+
+
+def GetGTestOutput(args):
+ """Extracts gtest_output from the args. Returns none if not present."""
+
+ for arg in args:
+ if '--gtest_output=' in arg:
+ return arg.split('=')[1]
+ return None
+
+
+def AppendToGTestOutput(gtest_args, value):
+ args = gtest_args[:]
+ current_value = GetGTestOutput(args)
+ if not current_value:
+ return gtest_args
+
+ current_arg = '--gtest_output=' + current_value
+ args.remove(current_arg)
+ args.append('--gtest_output=' + current_value + value)
+ return args
+
+
+def RemoveGTestOutput(gtest_args):
+ args = gtest_args[:]
+ current_value = GetGTestOutput(args)
+ if not current_value:
+ return gtest_args
+
+ args.remove('--gtest_output=' + current_value)
+ return args
+
+
+def AppendToXML(final_xml, generic_path, shard):
+ """Combine the shard xml file with the final xml file."""
+
+ path = generic_path + str(shard)
+
+ try:
+ with open(path) as shard_xml_file:
+ shard_xml = minidom.parse(shard_xml_file)
+ except IOError:
+ # If the shard crashed, gtest will not have generated an xml file.
+ return final_xml
+
+ if not final_xml:
+ # Out final xml is empty, let's prepopulate it with the first one we see.
+ return shard_xml
+
+ shard_node = shard_xml.documentElement
+ final_node = final_xml.documentElement
+
+ testcases = shard_node.getElementsByTagName('testcase')
+ final_testcases = final_node.getElementsByTagName('testcase')
+
+ final_testsuites = final_node.getElementsByTagName('testsuite')
+ final_testsuites_by_name = dict(
+ (suite.getAttribute('name'), suite) for suite in final_testsuites)
+
+ for testcase in testcases:
+ name = testcase.getAttribute('name')
+ classname = testcase.getAttribute('classname')
+ failures = testcase.getElementsByTagName('failure')
+ status = testcase.getAttribute('status')
+ elapsed = testcase.getAttribute('time')
+
+ # don't bother updating the final xml if there is no data.
+ if status == 'notrun':
+ continue
+
+ # Look in our final xml to see if it's there.
+ # There has to be a better way...
+ merged_into_final_testcase = False
+ for final_testcase in final_testcases:
+ final_name = final_testcase.getAttribute('name')
+ final_classname = final_testcase.getAttribute('classname')
+ if final_name == name and final_classname == classname:
+ # We got the same entry.
+ final_testcase.setAttribute('status', status)
+ final_testcase.setAttribute('time', elapsed)
+ for failure in failures:
+ final_testcase.appendChild(failure)
+ merged_into_final_testcase = True
+
+ # We couldn't find an existing testcase to merge the results into, so we
+ # copy the node into the existing test suite.
+ if not merged_into_final_testcase:
+ testsuite = testcase.parentNode
+ final_testsuite = final_testsuites_by_name[testsuite.getAttribute('name')]
+ final_testsuite.appendChild(testcase)
+
+ return final_xml
+
+
+def RunShard(test, total_shards, index, gtest_args, stdout, stderr):
+ """Runs a single test shard in a subprocess.
+
+ Returns:
+ The Popen object representing the subprocess handle.
+ """
+ args = [test]
+
+ # If there is a gtest_output
+ test_args = AppendToGTestOutput(gtest_args, str(index))
+ args.extend(test_args)
+ env = os.environ.copy()
+ env["GTEST_TOTAL_SHARDS"] = str(total_shards)
+ env["GTEST_SHARD_INDEX"] = str(index)
+
+ # Use a unique log file for each shard
+ # Allows ui_tests to be run in parallel on the same machine
+ env["CHROME_LOG_FILE"] = "chrome_log_%d" % index
+
+ return subprocess.Popen(
+ args, stdout=stdout,
+ stderr=stderr,
+ env=env,
+ bufsize=0,
+ universal_newlines=True)
+
+
+class ShardRunner(threading.Thread):
+ """Worker thread that manages a single shard at a time.
+
+ Attributes:
+ supervisor: The ShardingSupervisor that this worker reports to.
+ counter: Called to get the next shard index to run.
+ test_start: Regex that detects when a test runs.
+ test_ok: Regex that detects a passing test.
+ test_fail: Regex that detects a failing test.
+ current_test: The name of the currently running test.
+ """
+
+ def __init__(self, supervisor, counter, test_start, test_ok, test_fail):
+ """Inits ShardRunner and sets the current test to nothing."""
+ threading.Thread.__init__(self)
+ self.supervisor = supervisor
+ self.counter = counter
+ self.test_start = test_start
+ self.test_ok = test_ok
+ self.test_fail = test_fail
+ self.current_test = ""
-ROOT_DIR = os.path.dirname(
- os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+ def ReportFailure(self, description, index, test_name):
+ """Assembles and reports a failure line to be printed later."""
+ log_line = "%s (%i): %s\n" % (description, index, test_name)
+ self.supervisor.LogTestFailure(log_line)
+ def ProcessLine(self, index, line):
+ """Checks a shard output line for test status, and reports a failure or
+ incomplete test if needed.
+ """
+ results = self.test_start.search(line)
+ if results:
+ if self.current_test:
+ self.ReportFailure("INCOMPLETE", index, self.current_test)
+ self.current_test = results.group(1)
+ self.supervisor.IncrementTestCount()
+ return
-def pop_gtest_output(args):
- """Extracts --gtest_output from the args if present."""
- for index, arg in enumerate(args):
- if arg.startswith('--gtest_output='):
- return args.pop(index)
+ results = self.test_ok.search(line)
+ if results:
+ self.current_test = ""
+ return
+
+ results = self.test_fail.search(line)
+ if results:
+ self.ReportFailure("FAILED", index, results.group(1))
+ self.current_test = ""
+
+ def run(self):
+ """Runs shards and outputs the results.
+
+ Gets the next shard index from the supervisor, runs it in a subprocess,
+ and collects the output. The output is read character by character in
+ case the shard crashes without an ending newline. Each line is processed
+ as it is finished.
+ """
+ while True:
+ try:
+ index = self.counter.get_nowait()
+ except Queue.Empty:
+ break
+ shard_running = True
+ shard = RunShard(
+ self.supervisor.test, self.supervisor.total_shards, index,
+ self.supervisor.gtest_args, subprocess.PIPE, subprocess.PIPE)
+ buffer = StdioBuffer(shard)
+ # Spawn two threads to collect stdio output
+ stdout_collector_thread = buffer.handle_pipe(sys.stdout, shard.stdout)
+ stderr_collector_thread = buffer.handle_pipe(sys.stderr, shard.stderr)
+ while shard_running:
+ pipe, line = buffer.readline()
+ if pipe is None and line is None:
+ shard_running = False
+ if not line and not shard_running:
+ break
+ self.ProcessLine(index, line)
+ self.supervisor.LogOutputLine(index, line, pipe)
+ stdout_collector_thread.join()
+ stderr_collector_thread.join()
+ if self.current_test:
+ self.ReportFailure("INCOMPLETE", index, self.current_test)
+ self.supervisor.ShardIndexCompleted(index)
+ if shard.returncode != 0:
+ self.supervisor.LogShardFailure(index)
+
+
+class ShardingSupervisor(object):
+ """Supervisor object that handles the worker threads.
+
+ Attributes:
+ test: Name of the test to shard.
+ num_shards_to_run: Total number of shards to split the test into.
+ num_runs: Total number of worker threads to create for running shards.
+ color: Indicates which coloring mode to use in the output.
+ original_order: True if shard output should be printed as it comes.
+ prefix: True if each line should indicate the shard index.
+ retry_percent: Integer specifying the max percent of tests to retry.
+ gtest_args: The options to pass to gtest.
+ failed_tests: List of statements from shard output indicating a failure.
+ failed_shards: List of shards that contained failing tests.
+ shards_completed: List of flags indicating which shards have finished.
+ shard_output: Buffer that stores output from each shard as (stdio, line).
+ test_counter: Stores the total number of tests run.
+ total_slaves: Total number of slaves running this test.
+ slave_index: Current slave to run tests for.
+
+ If total_slaves is set, we run only a subset of the tests. This is meant to be
+ used when we want to shard across machines as well as across cpus. In that
+ case the number of shards to execute will be the same, but they will be
+ smaller, as the total number of shards in the test suite will be multiplied
+ by 'total_slaves'.
+
+ For example, if you are on a quad core machine, the sharding supervisor by
+ default will use 20 shards for the whole suite. However, if you set
+ total_slaves to 2, it will split the suite in 40 shards and will only run
+ shards [0-19] or shards [20-39] depending if you set slave_index to 0 or 1.
+ """
+
+ SHARD_COMPLETED = object()
+
+ def __init__(self, test, num_shards_to_run, num_runs, color, original_order,
+ prefix, retry_percent, timeout, total_slaves, slave_index,
+ gtest_args):
+ """Inits ShardingSupervisor with given options and gtest arguments."""
+ self.test = test
+ # Number of shards to run locally.
+ self.num_shards_to_run = num_shards_to_run
+ # Total shards in the test suite running across all slaves.
+ self.total_shards = num_shards_to_run * total_slaves
+ self.slave_index = slave_index
+ self.num_runs = num_runs
+ self.color = color
+ self.original_order = original_order
+ self.prefix = prefix
+ self.retry_percent = retry_percent
+ self.timeout = timeout
+ self.gtest_args = gtest_args
+ self.failed_tests = []
+ self.failed_shards = []
+ self.shards_completed = [False] * self.num_shards_to_run
+ self.shard_output = [Queue.Queue() for _ in range(self.num_shards_to_run)]
+ self.test_counter = itertools.count()
+
+ def ShardTest(self):
+ """Runs the test and manages the worker threads.
+
+ Runs the test and outputs a summary at the end. All the tests in the
+ suite are run by creating (cores * runs_per_core) threads and
+ (cores * shards_per_core) shards. When all the worker threads have
+ finished, the lines saved in failed_tests are printed again. If enabled,
+ and failed tests that do not have FLAKY or FAILS in their names are run
+ again, serially, and the results are printed.
+
+ Returns:
+ 1 if some unexpected (not FLAKY or FAILS) tests failed, 0 otherwise.
+ """
+
+ # Regular expressions for parsing GTest logs. Test names look like
+ # SomeTestCase.SomeTest
+ # SomeName/SomeTestCase.SomeTest/1
+ # This regex also matches SomeName.SomeTest/1 and
+ # SomeName/SomeTestCase.SomeTest, which should be harmless.
+ test_name_regex = r"((\w+/)?\w+\.\w+(/\d+)?)"
+
+ # Regex for filtering out ANSI escape codes when using color.
+ ansi_regex = r"(?:\x1b\[.*?[a-zA-Z])?"
+
+ test_start = re.compile(
+ ansi_regex + r"\[\s+RUN\s+\] " + ansi_regex + test_name_regex)
+ test_ok = re.compile(
+ ansi_regex + r"\[\s+OK\s+\] " + ansi_regex + test_name_regex)
+ test_fail = re.compile(
+ ansi_regex + r"\[\s+FAILED\s+\] " + ansi_regex + test_name_regex)
+
+ workers = []
+ counter = Queue.Queue()
+ start_point = self.num_shards_to_run * self.slave_index
+ for i in range(start_point, start_point + self.num_shards_to_run):
+ counter.put(i)
+
+ for i in range(self.num_runs):
+ worker = ShardRunner(
+ self, counter, test_start, test_ok, test_fail)
+ worker.start()
+ workers.append(worker)
+ if self.original_order:
+ for worker in workers:
+ worker.join()
+ else:
+ self.WaitForShards()
+
+ # All the shards are done. Merge all the XML files and generate the
+ # main one.
+ output_arg = GetGTestOutput(self.gtest_args)
+ if output_arg:
+ xml, xml_path = output_arg.split(':', 1)
+ assert(xml == 'xml')
+ final_xml = None
+ for i in range(start_point, start_point + self.num_shards_to_run):
+ final_xml = AppendToXML(final_xml, xml_path, i)
+
+ if final_xml:
+ with open(xml_path, 'w') as final_file:
+ final_xml.writexml(final_file)
+
+ num_failed = len(self.failed_shards)
+ if num_failed > 0:
+ self.failed_shards.sort()
+ self.WriteText(sys.stdout,
+ "\nFAILED SHARDS: %s\n" % str(self.failed_shards),
+ "\x1b[1;5;31m")
+ else:
+ self.WriteText(sys.stdout, "\nALL SHARDS PASSED!\n", "\x1b[1;5;32m")
+ self.PrintSummary(self.failed_tests)
+ if self.retry_percent < 0:
+ return len(self.failed_shards) > 0
+
+ self.failed_tests = [x for x in self.failed_tests if x.find("FAILS_") < 0]
+ self.failed_tests = [x for x in self.failed_tests if x.find("FLAKY_") < 0]
+ if not self.failed_tests:
+ return 0
+ return self.RetryFailedTests()
+
+ def LogTestFailure(self, line):
+ """Saves a line in the lsit of failed tests to be printed at the end."""
+ if line not in self.failed_tests:
+ self.failed_tests.append(line)
+
+ def LogShardFailure(self, index):
+ """Records that a test in the given shard has failed."""
+ self.failed_shards.append(index)
+
+ def WaitForShards(self):
+ """Prints the output from each shard in consecutive order, waiting for
+ the current shard to finish before starting on the next shard.
+ """
+ try:
+ for shard_index in range(self.num_shards_to_run):
+ while True:
+ try:
+ pipe, line = self.shard_output[shard_index].get(True, self.timeout)
+ except Queue.Empty:
+ # Shard timed out, notice failure and move on.
+ self.LogShardFailure(shard_index)
+ # TODO(maruel): Print last test. It'd be simpler to have the
+ # processing in the main thread.
+ # TODO(maruel): Make sure the worker thread terminates.
+ sys.stdout.write('TIMED OUT\n\n')
+ LogTestFailure(
+ 'FAILURE: SHARD %d TIMED OUT; %d seconds' % (
+ shard_index, self.timeout))
+ break
+ if line is self.SHARD_COMPLETED:
+ break
+ sys.stdout.write(line)
+ except:
+ sys.stdout.flush()
+ print 'CAUGHT EXCEPTION: dumping remaining data:'
+ for shard_index in range(self.num_shards_to_run):
+ while True:
+ try:
+ pipe, line = self.shard_output[shard_index].get(False)
+ except Queue.Empty:
+ # Shard timed out, notice failure and move on.
+ self.LogShardFailure(shard_index)
+ break
+ if line is self.SHARD_COMPLETED:
+ break
+ sys.stdout.write(line)
+ raise
+
+ def LogOutputLine(self, index, line, pipe=sys.stdout):
+ """Either prints the shard output line immediately or saves it in the
+ output buffer, depending on the settings. Also optionally adds a prefix.
+ Adds a (sys.stdout, line) or (sys.stderr, line) tuple in the output queue.
+ """
+ # Fix up the index.
+ array_index = index - (self.num_shards_to_run * self.slave_index)
+ if self.prefix:
+ line = "%i>%s" % (index, line)
+ if self.original_order:
+ pipe.write(line)
+ else:
+ self.shard_output[array_index].put((pipe, line))
+
+ def IncrementTestCount(self):
+ """Increments the number of tests run. This is relevant to the
+ --retry-percent option.
+ """
+ self.test_counter.next()
+
+ def ShardIndexCompleted(self, index):
+ """Records that a shard has finished so the output from the next shard
+ can now be printed.
+ """
+ # Fix up the index.
+ array_index = index - (self.num_shards_to_run * self.slave_index)
+ self.shard_output[array_index].put((sys.stdout, self.SHARD_COMPLETED))
+
+ def RetryFailedTests(self):
+ """Reruns any failed tests serially and prints another summary of the
+ results if no more than retry_percent failed.
+ """
+ num_tests_run = self.test_counter.next()
+ if len(self.failed_tests) > self.retry_percent * num_tests_run:
+ sys.stdout.write("\nNOT RETRYING FAILED TESTS (too many failed)\n")
+ return 1
+ self.WriteText(sys.stdout, "\nRETRYING FAILED TESTS:\n", "\x1b[1;5;33m")
+ sharded_description = re.compile(r": (?:\d+>)?(.*)")
+ gtest_filters = [sharded_description.search(line).group(1)
+ for line in self.failed_tests]
+ failed_retries = []
+
+ for test_filter in gtest_filters:
+ args = [self.test, "--gtest_filter=" + test_filter]
+ # Don't update the xml output files during retry.
+ stripped_gtests_args = RemoveGTestOutput(self.gtest_args)
+ args.extend(stripped_gtests_args)
+ rerun = subprocess.Popen(args)
+ rerun.wait()
+ if rerun.returncode != 0:
+ failed_retries.append(test_filter)
+
+ self.WriteText(sys.stdout, "RETRY RESULTS:\n", "\x1b[1;5;33m")
+ self.PrintSummary(failed_retries)
+ return len(failed_retries) > 0
+
+ def PrintSummary(self, failed_tests):
+ """Prints a summary of the test results.
+
+ If any shards had failing tests, the list is sorted and printed. Then all
+ the lines that indicate a test failure are reproduced.
+ """
+ if failed_tests:
+ self.WriteText(sys.stdout, "FAILED TESTS:\n", "\x1b[1;5;31m")
+ for line in failed_tests:
+ sys.stdout.write(line)
+ else:
+ self.WriteText(sys.stdout, "ALL TESTS PASSED!\n", "\x1b[1;5;32m")
+
+ def WriteText(self, pipe, text, ansi):
+ """Writes the text to the pipe with the ansi escape code, if colored
+ output is set, for Unix systems.
+ """
+ if self.color:
+ pipe.write(ansi)
+ pipe.write(text)
+ if self.color:
+ pipe.write("\x1b[m")
def main():
- parser = optparse.OptionParser()
-
- group = optparse.OptionGroup(
- parser, 'Compability flag with the old sharding_supervisor')
- group.add_option(
- '--no-color', action='store_true', help='Ignored')
- group.add_option(
- '--retry-failed', action='store_true', help='Ignored')
- group.add_option(
- '-t', '--timeout', type='int', help='Kept as --timeout')
- group.add_option(
- '--total-slaves', type='int', default=1, help='Converted to --index')
- group.add_option(
- '--slave-index', type='int', default=0, help='Converted to --shards')
- parser.add_option_group(group)
+ parser = optparse.OptionParser(usage=SS_USAGE)
+ parser.add_option(
+ "-n", "--shards_per_core", type="int", default=SS_DEFAULT_SHARDS_PER_CORE,
+ help="number of shards to generate per CPU")
+ parser.add_option(
+ "-r", "--runs_per_core", type="int", default=SS_DEFAULT_RUNS_PER_CORE,
+ help="number of shards to run in parallel per CPU")
+ parser.add_option(
+ "-c", "--color", action="store_true",
+ default=sys.platform != "win32" and sys.stdout.isatty(),
+ help="force color output, also used by gtest if --gtest_color is not"
+ " specified")
+ parser.add_option(
+ "--no-color", action="store_false", dest="color",
+ help="disable color output")
+ parser.add_option(
+ "-s", "--runshard", type="int", help="single shard index to run")
+ parser.add_option(
+ "--reorder", action="store_true",
+ help="ensure that all output from an earlier shard is printed before"
+ " output from a later shard")
+ # TODO(charleslee): for backwards compatibility with master.cfg file
+ parser.add_option(
+ "--original-order", action="store_true",
+ help="print shard output in its orginal jumbled order of execution"
+ " (useful for debugging flaky tests)")
+ parser.add_option(
+ "--prefix", action="store_true",
+ help="prefix each line of shard output with 'N>', where N is the shard"
+ " index (forced True when --original-order is True)")
+ parser.add_option(
+ "--random-seed", action="store_true",
+ help="shuffle the tests with a random seed value")
+ parser.add_option(
+ "--retry-failed", action="store_true",
+ help="retry tests that did not pass serially")
+ parser.add_option(
+ "--retry-percent", type="int",
+ default=SS_DEFAULT_RETRY_PERCENT,
+ help="ignore --retry-failed if more than this percent fail [0, 100]"
+ " (default = %i)" % SS_DEFAULT_RETRY_PERCENT)
+ parser.add_option(
+ "-t", "--timeout", type="int", default=SS_DEFAULT_TIMEOUT,
+ help="timeout in seconds to wait for a shard (default=%default s)")
+ parser.add_option(
+ "--total-slaves", type="int", default=1,
+ help="if running a subset, number of slaves sharing the test")
+ parser.add_option(
+ "--slave-index", type="int", default=0,
+ help="if running a subset, index of the slave to run tests for")
parser.disable_interspersed_args()
- options, args = parser.parse_args()
+ (options, args) = parser.parse_args()
+
+ if not args:
+ parser.error("You must specify a path to test!")
+ if not os.path.exists(args[0]):
+ parser.error("%s does not exist!" % args[0])
+
+ num_cores = DetectNumCores()
+
+ if options.shards_per_core < 1:
+ parser.error("You must have at least 1 shard per core!")
+ num_shards_to_run = num_cores * options.shards_per_core
+
+ if options.runs_per_core < 1:
+ parser.error("You must have at least 1 run per core!")
+ num_runs = num_cores * options.runs_per_core
+
+ test = args[0]
+ gtest_args = ["--gtest_color=%s" % {
+ True: "yes", False: "no"}[options.color]] + args[1:]
+
+ if options.original_order:
+ options.prefix = True
+
+ # TODO(charleslee): for backwards compatibility with buildbot's log_parser
+ if options.reorder:
+ options.original_order = False
+ options.prefix = True
+
+ if options.random_seed:
+ seed = random.randint(1, 99999)
+ gtest_args.extend(["--gtest_shuffle", "--gtest_random_seed=%i" % seed])
- swarm_client_dir = os.path.join(ROOT_DIR, 'tools', 'swarm_client')
- sys.path.insert(0, swarm_client_dir)
+ if options.retry_failed:
+ if options.retry_percent < 0 or options.retry_percent > 100:
+ parser.error("Retry percent must be an integer [0, 100]!")
+ else:
+ options.retry_percent = -1
- cmd = [
- '--shards', str(options.total_slaves),
- '--index', str(options.slave_index),
- '--no-dump',
- '--no-cr',
- ]
- if options.timeout is not None:
- cmd.extend(['--timeout', str(options.timeout)])
- gtest_output = pop_gtest_output(args)
- if gtest_output:
- # It is important that --gtest_output appears before the '--' so it is
- # properly processed by run_test_cases.
- cmd.append(gtest_output)
+ if options.runshard != None:
+ # run a single shard and exit
+ if (options.runshard < 0 or options.runshard >= num_shards_to_run):
+ parser.error("Invalid shard number given parameters!")
+ shard = RunShard(
+ test, num_shards_to_run, options.runshard, gtest_args, None, None)
+ shard.communicate()
+ return shard.poll()
- import run_test_cases # pylint: disable=F0401
+ # When running browser_tests, load the test binary into memory before running
+ # any tests. This is needed to prevent loading it from disk causing the first
+ # run tests to timeout flakily. See: http://crbug.com/124260
+ if "browser_tests" in test:
+ args = [test]
+ args.extend(gtest_args)
+ args.append("--warmup")
+ result = subprocess.call(args,
+ bufsize=0,
+ universal_newlines=True)
+ # If the test fails, don't run anything else.
+ if result != 0:
+ return result
- return run_test_cases.main(cmd + ['--'] + args)
+ # shard and run the whole test
+ ss = ShardingSupervisor(
+ test, num_shards_to_run, num_runs, options.color,
+ options.original_order, options.prefix, options.retry_percent,
+ options.timeout, options.total_slaves, options.slave_index, gtest_args)
+ return ss.ShardTest()
-if __name__ == '__main__':
+if __name__ == "__main__":
sys.exit(main())
diff --git a/tools/sharding_supervisor/sharding_supervisor_unittest.py b/tools/sharding_supervisor/sharding_supervisor_unittest.py
new file mode 100755
index 0000000..77b1f54
--- /dev/null
+++ b/tools/sharding_supervisor/sharding_supervisor_unittest.py
@@ -0,0 +1,129 @@
+#!/usr/bin/env python
+# Copyright (c) 2012 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Verify basic usage of sharding_supervisor."""
+
+import difflib
+import os
+import subprocess
+import sys
+import unittest
+
+from xml.dom import minidom
+
+import sharding_supervisor
+
+ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
+SHARDING_SUPERVISOR = os.path.join(ROOT_DIR, 'sharding_supervisor.py')
+DUMMY_TEST = os.path.join(ROOT_DIR, 'dummy_test.py')
+NUM_CORES = sharding_supervisor.DetectNumCores()
+SHARDS_PER_CORE = sharding_supervisor.SS_DEFAULT_SHARDS_PER_CORE
+
+
+def generate_expected_output(start, end, num_shards):
+ """Generate the expected stdout and stderr for the dummy test."""
+ stdout = ''
+ stderr = ''
+ for i in range(start, end):
+ stdout += 'Running shard %d of %d%s' % (i, num_shards, os.linesep)
+ stdout += '%sALL SHARDS PASSED!%sALL TESTS PASSED!%s' % (os.linesep,
+ os.linesep,
+ os.linesep)
+
+ return (stdout, stderr)
+
+
+class ShardingSupervisorUnittest(unittest.TestCase):
+ def test_basic_run(self):
+ # Default test.
+ expected_shards = NUM_CORES * SHARDS_PER_CORE
+ (expected_out, expected_err) = generate_expected_output(
+ 0, expected_shards, expected_shards)
+ p = subprocess.Popen([sys.executable, SHARDING_SUPERVISOR, '--no-color',
+ DUMMY_TEST], stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ (out, err) = p.communicate()
+ self.assertEqual(expected_out, out)
+ self.assertEqual(expected_err, err)
+ self.assertEqual(0, p.returncode)
+
+ def test_shard_per_core(self):
+ """Test the --shards_per_core parameter."""
+ expected_shards = NUM_CORES * 25
+ (expected_out, expected_err) = generate_expected_output(
+ 0, expected_shards, expected_shards)
+ p = subprocess.Popen([sys.executable, SHARDING_SUPERVISOR, '--no-color',
+ '--shards_per_core', '25', DUMMY_TEST],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ (out, err) = p.communicate()
+ self.assertEqual(expected_out, out)
+ self.assertEqual(expected_err, err)
+ self.assertEqual(0, p.returncode)
+
+ def test_slave_sharding(self):
+ """Test the --total-slaves and --slave-index parameters."""
+ total_shards = 6
+ expected_shards = NUM_CORES * SHARDS_PER_CORE * total_shards
+
+ # Test every single index to make sure they run correctly.
+ for index in range(total_shards):
+ begin = NUM_CORES * SHARDS_PER_CORE * index
+ end = begin + NUM_CORES * SHARDS_PER_CORE
+ (expected_out, expected_err) = generate_expected_output(
+ begin, end, expected_shards)
+ p = subprocess.Popen([sys.executable, SHARDING_SUPERVISOR, '--no-color',
+ '--total-slaves', str(total_shards),
+ '--slave-index', str(index),
+ DUMMY_TEST],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ (out, err) = p.communicate()
+ self.assertEqual(expected_out, out)
+ self.assertEqual(expected_err, err)
+ self.assertEqual(0, p.returncode)
+
+ def test_append_to_xml(self):
+ shard_xml_path = os.path.join(ROOT_DIR, 'data', 'gtest_results.xml')
+ expected_xml_path = os.path.join(
+ ROOT_DIR, 'data', 'gtest_results_expected.xml')
+ merged_xml = sharding_supervisor.AppendToXML(None, shard_xml_path, 0)
+ merged_xml = sharding_supervisor.AppendToXML(merged_xml, shard_xml_path, 1)
+
+ with open(expected_xml_path) as expected_xml_file:
+ expected_xml = minidom.parse(expected_xml_file)
+
+ # Serialize XML to a list of strings that is consistently formatted
+ # (ignoring whitespace between elements) so that it may be compared.
+ def _serialize_xml(xml):
+ def _remove_whitespace_and_comments(xml):
+ children_to_remove = []
+ for child in xml.childNodes:
+ if (child.nodeType == minidom.Node.TEXT_NODE and
+ not child.data.strip()):
+ children_to_remove.append(child)
+ elif child.nodeType == minidom.Node.COMMENT_NODE:
+ children_to_remove.append(child)
+ elif child.nodeType == minidom.Node.ELEMENT_NODE:
+ _remove_whitespace_and_comments(child)
+
+ for child in children_to_remove:
+ xml.removeChild(child)
+
+ _remove_whitespace_and_comments(xml)
+ return xml.toprettyxml(indent=' ').splitlines()
+
+ diff = list(difflib.unified_diff(
+ _serialize_xml(expected_xml),
+ _serialize_xml(merged_xml),
+ fromfile='gtest_results_expected.xml',
+ tofile='gtest_results_actual.xml'))
+ if diff:
+ self.fail('Did not merge results XML correctly:\n' + '\n'.join(diff))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/tools/sharding_supervisor/stdio_buffer.py b/tools/sharding_supervisor/stdio_buffer.py
new file mode 100644
index 0000000..737374d
--- /dev/null
+++ b/tools/sharding_supervisor/stdio_buffer.py
@@ -0,0 +1,60 @@
+# Copyright (c) 2012 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Syncronized Standard IO Linebuffer implemented with cStringIO."""
+
+import cStringIO
+import os
+import sys
+import threading
+import Queue
+
+
+class StdioBuffer(object):
+ def __init__(self, shard):
+ self.line_ready_event = threading.Event()
+ self.queue = Queue.Queue()
+ self.lock = threading.Lock()
+ self.completed = 0
+ self.shard = shard
+
+ def _pipe_handler(self, system_pipe, program_pipe):
+ """Helper method for collecting stdio output. Output is collected until
+ a newline is seen, at which point an event is triggered and the line is
+ pushed to a buffer as a (stdio, line) tuple."""
+ buffer = cStringIO.StringIO()
+ pipe_running = True
+ while pipe_running:
+ char = program_pipe.read(1)
+ if not char and self.shard.poll() is not None:
+ pipe_running = False
+ self.line_ready_event.set()
+ buffer.write(char)
+ if char == '\n' or not pipe_running:
+ line = buffer.getvalue()
+ if not line and not pipe_running:
+ with self.lock:
+ self.completed += 1
+ self.line_ready_event.set()
+ break
+ self.queue.put((system_pipe, line))
+ self.line_ready_event.set()
+ buffer.close()
+ buffer = cStringIO.StringIO()
+
+ def handle_pipe(self, system_pipe, program_pipe):
+ t = threading.Thread(target=self._pipe_handler, args=[system_pipe,
+ program_pipe])
+ t.start()
+ return t
+
+ def readline(self):
+ """Emits a tuple of (sys.stderr, line) or (sys.stdout, line). This is a
+ blocking call."""
+ while self.completed < 2 and self.queue.empty():
+ self.line_ready_event.wait()
+ self.line_ready_event.clear()
+ if not self.queue.empty():
+ return self.queue.get_nowait()
+ else:
+ return (None, None) \ No newline at end of file