diff options
author | phajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-11-07 22:58:10 +0000 |
---|---|---|
committer | phajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-11-07 22:58:10 +0000 |
commit | da48400647dbc5bf743c8ef75c364b567b49898e (patch) | |
tree | 7e14eaf66942772ccfc3d81114386ac8ab2b5155 /tools | |
parent | 796c352e74203d62d84e2ba216ee3848b6527889 (diff) | |
download | chromium_src-da48400647dbc5bf743c8ef75c364b567b49898e.zip chromium_src-da48400647dbc5bf743c8ef75c364b567b49898e.tar.gz chromium_src-da48400647dbc5bf743c8ef75c364b567b49898e.tar.bz2 |
GTTF: remove parts of sharding_supervisor that might be hang-prone.
BUG=none
TEST=none
Review URL: https://codereview.chromium.org/11367047
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@166538 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools')
-rwxr-xr-x | tools/sharding_supervisor/sharding_supervisor.py | 89 | ||||
-rw-r--r-- | tools/sharding_supervisor/stdio_buffer.py | 60 |
2 files changed, 20 insertions, 129 deletions
diff --git a/tools/sharding_supervisor/sharding_supervisor.py b/tools/sharding_supervisor/sharding_supervisor.py index 89d3ec5..94a0d78 100755 --- a/tools/sharding_supervisor/sharding_supervisor.py +++ b/tools/sharding_supervisor/sharding_supervisor.py @@ -24,20 +24,14 @@ import re import sys import threading -from stdio_buffer import StdioBuffer from xml.dom import minidom # Add tools/ to path BASE_PATH = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.join(BASE_PATH, "..")) -try: - import find_depot_tools - # Fixes a bug in Windows where some shards die upon starting - # TODO(charleslee): actually fix this bug - import subprocess2 as subprocess -except ImportError: - # Unable to find depot_tools, so just use standard subprocess - import subprocess + +import find_depot_tools +import subprocess2 as subprocess SS_USAGE = "python %prog [options] path/to/test [gtest_args]" SS_DEFAULT_NUM_CORES = 4 @@ -186,6 +180,7 @@ def RunShard(test, total_shards, index, gtest_args, stdout, stderr): return subprocess.Popen( args, stdout=stdout, stderr=stderr, + shell=False, env=env, bufsize=0, universal_newlines=True) @@ -253,24 +248,14 @@ class ShardRunner(threading.Thread): index = self.counter.get_nowait() except Queue.Empty: break - shard_running = True shard = RunShard( self.supervisor.test, self.supervisor.total_shards, index, - self.supervisor.gtest_args, subprocess.PIPE, subprocess.PIPE) - buffer = StdioBuffer(shard) - # Spawn two threads to collect stdio output - stdout_collector_thread = buffer.handle_pipe(sys.stdout, shard.stdout) - stderr_collector_thread = buffer.handle_pipe(sys.stderr, shard.stderr) - while shard_running: - pipe, line = buffer.readline() - if pipe is None and line is None: - shard_running = False - if not line and not shard_running: - break + self.supervisor.gtest_args, subprocess.PIPE, subprocess.STDOUT) + stdout, _ = shard.communicate(timeout=self.supervisor.timeout) + return_code = shard.poll() + for line in stdout.splitlines(True): self.ProcessLine(index, line) - self.supervisor.LogOutputLine(index, line, pipe) - stdout_collector_thread.join() - stderr_collector_thread.join() + self.supervisor.LogOutputLine(index, line, None) if self.current_test: self.ReportFailure("INCOMPLETE", index, self.current_test) self.supervisor.ShardIndexCompleted(index) @@ -286,8 +271,6 @@ class ShardingSupervisor(object): num_shards_to_run: Total number of shards to split the test into. num_runs: Total number of worker threads to create for running shards. color: Indicates which coloring mode to use in the output. - original_order: True if shard output should be printed as it comes. - prefix: True if each line should indicate the shard index. retry_percent: Integer specifying the max percent of tests to retry. gtest_args: The options to pass to gtest. failed_tests: List of statements from shard output indicating a failure. @@ -312,9 +295,8 @@ class ShardingSupervisor(object): SHARD_COMPLETED = object() - def __init__(self, test, num_shards_to_run, num_runs, color, original_order, - prefix, retry_percent, timeout, total_slaves, slave_index, - gtest_args): + def __init__(self, test, num_shards_to_run, num_runs, color, + retry_percent, timeout, total_slaves, slave_index, gtest_args): """Inits ShardingSupervisor with given options and gtest arguments.""" self.test = test # Number of shards to run locally. @@ -324,8 +306,6 @@ class ShardingSupervisor(object): self.slave_index = slave_index self.num_runs = num_runs self.color = color - self.original_order = original_order - self.prefix = prefix self.retry_percent = retry_percent self.timeout = timeout self.gtest_args = gtest_args @@ -377,11 +357,7 @@ class ShardingSupervisor(object): self, counter, test_start, test_ok, test_fail) worker.start() workers.append(worker) - if self.original_order: - for worker in workers: - worker.join() - else: - self.WaitForShards() + self.WaitForShards() # All the shards are done. Merge all the XML files and generate the # main one. @@ -448,8 +424,8 @@ class ShardingSupervisor(object): break sys.stdout.write(line) except: - sys.stdout.flush() print 'CAUGHT EXCEPTION: dumping remaining data:' + sys.stdout.flush() for shard_index in range(self.num_shards_to_run): while True: try: @@ -461,21 +437,17 @@ class ShardingSupervisor(object): if line is self.SHARD_COMPLETED: break sys.stdout.write(line) + sys.stdout.flush() raise def LogOutputLine(self, index, line, pipe=sys.stdout): """Either prints the shard output line immediately or saves it in the - output buffer, depending on the settings. Also optionally adds a prefix. - Adds a (sys.stdout, line) or (sys.stderr, line) tuple in the output queue. + output buffer, depending on the settings. Adds a (sys.stdout, line) + or (sys.stderr, line) tuple in the output queue. """ # Fix up the index. array_index = index - (self.num_shards_to_run * self.slave_index) - if self.prefix: - line = "%i>%s" % (index, line) - if self.original_order: - pipe.write(line) - else: - self.shard_output[array_index].put((pipe, line)) + self.shard_output[array_index].put((pipe, line)) def IncrementTestCount(self): """Increments the number of tests run. This is relevant to the @@ -562,19 +534,6 @@ def main(): parser.add_option( "-s", "--runshard", type="int", help="single shard index to run") parser.add_option( - "--reorder", action="store_true", - help="ensure that all output from an earlier shard is printed before" - " output from a later shard") - # TODO(charleslee): for backwards compatibility with master.cfg file - parser.add_option( - "--original-order", action="store_true", - help="print shard output in its orginal jumbled order of execution" - " (useful for debugging flaky tests)") - parser.add_option( - "--prefix", action="store_true", - help="prefix each line of shard output with 'N>', where N is the shard" - " index (forced True when --original-order is True)") - parser.add_option( "--random-seed", action="store_true", help="shuffle the tests with a random seed value") parser.add_option( @@ -617,14 +576,6 @@ def main(): gtest_args = ["--gtest_color=%s" % { True: "yes", False: "no"}[options.color]] + args[1:] - if options.original_order: - options.prefix = True - - # TODO(charleslee): for backwards compatibility with buildbot's log_parser - if options.reorder: - options.original_order = False - options.prefix = True - if options.random_seed: seed = random.randint(1, 99999) gtest_args.extend(["--gtest_shuffle", "--gtest_random_seed=%i" % seed]) @@ -641,7 +592,7 @@ def main(): parser.error("Invalid shard number given parameters!") shard = RunShard( test, num_shards_to_run, options.runshard, gtest_args, None, None) - shard.communicate() + shard.communicate(timeout=self.timeout) return shard.poll() # When running browser_tests, load the test binary into memory before running @@ -661,8 +612,8 @@ def main(): # shard and run the whole test ss = ShardingSupervisor( test, num_shards_to_run, num_runs, options.color, - options.original_order, options.prefix, options.retry_percent, - options.timeout, options.total_slaves, options.slave_index, gtest_args) + options.retry_percent, options.timeout, options.total_slaves, + options.slave_index, gtest_args) return ss.ShardTest() diff --git a/tools/sharding_supervisor/stdio_buffer.py b/tools/sharding_supervisor/stdio_buffer.py deleted file mode 100644 index 737374d..0000000 --- a/tools/sharding_supervisor/stdio_buffer.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright (c) 2012 The Chromium Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. -"""Syncronized Standard IO Linebuffer implemented with cStringIO.""" - -import cStringIO -import os -import sys -import threading -import Queue - - -class StdioBuffer(object): - def __init__(self, shard): - self.line_ready_event = threading.Event() - self.queue = Queue.Queue() - self.lock = threading.Lock() - self.completed = 0 - self.shard = shard - - def _pipe_handler(self, system_pipe, program_pipe): - """Helper method for collecting stdio output. Output is collected until - a newline is seen, at which point an event is triggered and the line is - pushed to a buffer as a (stdio, line) tuple.""" - buffer = cStringIO.StringIO() - pipe_running = True - while pipe_running: - char = program_pipe.read(1) - if not char and self.shard.poll() is not None: - pipe_running = False - self.line_ready_event.set() - buffer.write(char) - if char == '\n' or not pipe_running: - line = buffer.getvalue() - if not line and not pipe_running: - with self.lock: - self.completed += 1 - self.line_ready_event.set() - break - self.queue.put((system_pipe, line)) - self.line_ready_event.set() - buffer.close() - buffer = cStringIO.StringIO() - - def handle_pipe(self, system_pipe, program_pipe): - t = threading.Thread(target=self._pipe_handler, args=[system_pipe, - program_pipe]) - t.start() - return t - - def readline(self): - """Emits a tuple of (sys.stderr, line) or (sys.stdout, line). This is a - blocking call.""" - while self.completed < 2 and self.queue.empty(): - self.line_ready_event.wait() - self.line_ready_event.clear() - if not self.queue.empty(): - return self.queue.get_nowait() - else: - return (None, None)
\ No newline at end of file |