diff options
author | phajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-11-08 00:03:37 +0000 |
---|---|---|
committer | phajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-11-08 00:03:37 +0000 |
commit | eddb05283c16ca117e86d1d356570b6fbb1d992f (patch) | |
tree | 71bfa9ff438ad28100bfb587327b564dfdcaf418 /tools | |
parent | be695a8499d69da452d530ebdba8f4a0a02694a5 (diff) | |
download | chromium_src-eddb05283c16ca117e86d1d356570b6fbb1d992f.zip chromium_src-eddb05283c16ca117e86d1d356570b6fbb1d992f.tar.gz chromium_src-eddb05283c16ca117e86d1d356570b6fbb1d992f.tar.bz2 |
Revert "GTTF: remove parts of sharding_supervisor that might be hang-prone."
This reverts commit 245def1fd6e955d439fb5b79f4a4fd7f025ed5c6.
BUG=
Review URL: https://codereview.chromium.org/11275204
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@166554 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools')
-rwxr-xr-x | tools/sharding_supervisor/sharding_supervisor.py | 89 | ||||
-rw-r--r-- | tools/sharding_supervisor/stdio_buffer.py | 60 |
2 files changed, 129 insertions, 20 deletions
diff --git a/tools/sharding_supervisor/sharding_supervisor.py b/tools/sharding_supervisor/sharding_supervisor.py index 94a0d78..89d3ec5 100755 --- a/tools/sharding_supervisor/sharding_supervisor.py +++ b/tools/sharding_supervisor/sharding_supervisor.py @@ -24,14 +24,20 @@ import re import sys import threading +from stdio_buffer import StdioBuffer from xml.dom import minidom # Add tools/ to path BASE_PATH = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.join(BASE_PATH, "..")) - -import find_depot_tools -import subprocess2 as subprocess +try: + import find_depot_tools + # Fixes a bug in Windows where some shards die upon starting + # TODO(charleslee): actually fix this bug + import subprocess2 as subprocess +except ImportError: + # Unable to find depot_tools, so just use standard subprocess + import subprocess SS_USAGE = "python %prog [options] path/to/test [gtest_args]" SS_DEFAULT_NUM_CORES = 4 @@ -180,7 +186,6 @@ def RunShard(test, total_shards, index, gtest_args, stdout, stderr): return subprocess.Popen( args, stdout=stdout, stderr=stderr, - shell=False, env=env, bufsize=0, universal_newlines=True) @@ -248,14 +253,24 @@ class ShardRunner(threading.Thread): index = self.counter.get_nowait() except Queue.Empty: break + shard_running = True shard = RunShard( self.supervisor.test, self.supervisor.total_shards, index, - self.supervisor.gtest_args, subprocess.PIPE, subprocess.STDOUT) - stdout, _ = shard.communicate(timeout=self.supervisor.timeout) - return_code = shard.poll() - for line in stdout.splitlines(True): + self.supervisor.gtest_args, subprocess.PIPE, subprocess.PIPE) + buffer = StdioBuffer(shard) + # Spawn two threads to collect stdio output + stdout_collector_thread = buffer.handle_pipe(sys.stdout, shard.stdout) + stderr_collector_thread = buffer.handle_pipe(sys.stderr, shard.stderr) + while shard_running: + pipe, line = buffer.readline() + if pipe is None and line is None: + shard_running = False + if not line and not shard_running: + break self.ProcessLine(index, line) - self.supervisor.LogOutputLine(index, line, None) + self.supervisor.LogOutputLine(index, line, pipe) + stdout_collector_thread.join() + stderr_collector_thread.join() if self.current_test: self.ReportFailure("INCOMPLETE", index, self.current_test) self.supervisor.ShardIndexCompleted(index) @@ -271,6 +286,8 @@ class ShardingSupervisor(object): num_shards_to_run: Total number of shards to split the test into. num_runs: Total number of worker threads to create for running shards. color: Indicates which coloring mode to use in the output. + original_order: True if shard output should be printed as it comes. + prefix: True if each line should indicate the shard index. retry_percent: Integer specifying the max percent of tests to retry. gtest_args: The options to pass to gtest. failed_tests: List of statements from shard output indicating a failure. @@ -295,8 +312,9 @@ class ShardingSupervisor(object): SHARD_COMPLETED = object() - def __init__(self, test, num_shards_to_run, num_runs, color, - retry_percent, timeout, total_slaves, slave_index, gtest_args): + def __init__(self, test, num_shards_to_run, num_runs, color, original_order, + prefix, retry_percent, timeout, total_slaves, slave_index, + gtest_args): """Inits ShardingSupervisor with given options and gtest arguments.""" self.test = test # Number of shards to run locally. @@ -306,6 +324,8 @@ class ShardingSupervisor(object): self.slave_index = slave_index self.num_runs = num_runs self.color = color + self.original_order = original_order + self.prefix = prefix self.retry_percent = retry_percent self.timeout = timeout self.gtest_args = gtest_args @@ -357,7 +377,11 @@ class ShardingSupervisor(object): self, counter, test_start, test_ok, test_fail) worker.start() workers.append(worker) - self.WaitForShards() + if self.original_order: + for worker in workers: + worker.join() + else: + self.WaitForShards() # All the shards are done. Merge all the XML files and generate the # main one. @@ -424,8 +448,8 @@ class ShardingSupervisor(object): break sys.stdout.write(line) except: - print 'CAUGHT EXCEPTION: dumping remaining data:' sys.stdout.flush() + print 'CAUGHT EXCEPTION: dumping remaining data:' for shard_index in range(self.num_shards_to_run): while True: try: @@ -437,17 +461,21 @@ class ShardingSupervisor(object): if line is self.SHARD_COMPLETED: break sys.stdout.write(line) - sys.stdout.flush() raise def LogOutputLine(self, index, line, pipe=sys.stdout): """Either prints the shard output line immediately or saves it in the - output buffer, depending on the settings. Adds a (sys.stdout, line) - or (sys.stderr, line) tuple in the output queue. + output buffer, depending on the settings. Also optionally adds a prefix. + Adds a (sys.stdout, line) or (sys.stderr, line) tuple in the output queue. """ # Fix up the index. array_index = index - (self.num_shards_to_run * self.slave_index) - self.shard_output[array_index].put((pipe, line)) + if self.prefix: + line = "%i>%s" % (index, line) + if self.original_order: + pipe.write(line) + else: + self.shard_output[array_index].put((pipe, line)) def IncrementTestCount(self): """Increments the number of tests run. This is relevant to the @@ -534,6 +562,19 @@ def main(): parser.add_option( "-s", "--runshard", type="int", help="single shard index to run") parser.add_option( + "--reorder", action="store_true", + help="ensure that all output from an earlier shard is printed before" + " output from a later shard") + # TODO(charleslee): for backwards compatibility with master.cfg file + parser.add_option( + "--original-order", action="store_true", + help="print shard output in its orginal jumbled order of execution" + " (useful for debugging flaky tests)") + parser.add_option( + "--prefix", action="store_true", + help="prefix each line of shard output with 'N>', where N is the shard" + " index (forced True when --original-order is True)") + parser.add_option( "--random-seed", action="store_true", help="shuffle the tests with a random seed value") parser.add_option( @@ -576,6 +617,14 @@ def main(): gtest_args = ["--gtest_color=%s" % { True: "yes", False: "no"}[options.color]] + args[1:] + if options.original_order: + options.prefix = True + + # TODO(charleslee): for backwards compatibility with buildbot's log_parser + if options.reorder: + options.original_order = False + options.prefix = True + if options.random_seed: seed = random.randint(1, 99999) gtest_args.extend(["--gtest_shuffle", "--gtest_random_seed=%i" % seed]) @@ -592,7 +641,7 @@ def main(): parser.error("Invalid shard number given parameters!") shard = RunShard( test, num_shards_to_run, options.runshard, gtest_args, None, None) - shard.communicate(timeout=self.timeout) + shard.communicate() return shard.poll() # When running browser_tests, load the test binary into memory before running @@ -612,8 +661,8 @@ def main(): # shard and run the whole test ss = ShardingSupervisor( test, num_shards_to_run, num_runs, options.color, - options.retry_percent, options.timeout, options.total_slaves, - options.slave_index, gtest_args) + options.original_order, options.prefix, options.retry_percent, + options.timeout, options.total_slaves, options.slave_index, gtest_args) return ss.ShardTest() diff --git a/tools/sharding_supervisor/stdio_buffer.py b/tools/sharding_supervisor/stdio_buffer.py new file mode 100644 index 0000000..737374d --- /dev/null +++ b/tools/sharding_supervisor/stdio_buffer.py @@ -0,0 +1,60 @@ +# Copyright (c) 2012 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +"""Syncronized Standard IO Linebuffer implemented with cStringIO.""" + +import cStringIO +import os +import sys +import threading +import Queue + + +class StdioBuffer(object): + def __init__(self, shard): + self.line_ready_event = threading.Event() + self.queue = Queue.Queue() + self.lock = threading.Lock() + self.completed = 0 + self.shard = shard + + def _pipe_handler(self, system_pipe, program_pipe): + """Helper method for collecting stdio output. Output is collected until + a newline is seen, at which point an event is triggered and the line is + pushed to a buffer as a (stdio, line) tuple.""" + buffer = cStringIO.StringIO() + pipe_running = True + while pipe_running: + char = program_pipe.read(1) + if not char and self.shard.poll() is not None: + pipe_running = False + self.line_ready_event.set() + buffer.write(char) + if char == '\n' or not pipe_running: + line = buffer.getvalue() + if not line and not pipe_running: + with self.lock: + self.completed += 1 + self.line_ready_event.set() + break + self.queue.put((system_pipe, line)) + self.line_ready_event.set() + buffer.close() + buffer = cStringIO.StringIO() + + def handle_pipe(self, system_pipe, program_pipe): + t = threading.Thread(target=self._pipe_handler, args=[system_pipe, + program_pipe]) + t.start() + return t + + def readline(self): + """Emits a tuple of (sys.stderr, line) or (sys.stdout, line). This is a + blocking call.""" + while self.completed < 2 and self.queue.empty(): + self.line_ready_event.wait() + self.line_ready_event.clear() + if not self.queue.empty(): + return self.queue.get_nowait() + else: + return (None, None)
\ No newline at end of file |