summaryrefslogtreecommitdiffstats
path: root/tools
diff options
context:
space:
mode:
authorphajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-11-07 22:58:10 +0000
committerphajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-11-07 22:58:10 +0000
commitda48400647dbc5bf743c8ef75c364b567b49898e (patch)
tree7e14eaf66942772ccfc3d81114386ac8ab2b5155 /tools
parent796c352e74203d62d84e2ba216ee3848b6527889 (diff)
downloadchromium_src-da48400647dbc5bf743c8ef75c364b567b49898e.zip
chromium_src-da48400647dbc5bf743c8ef75c364b567b49898e.tar.gz
chromium_src-da48400647dbc5bf743c8ef75c364b567b49898e.tar.bz2
GTTF: remove parts of sharding_supervisor that might be hang-prone.
BUG=none TEST=none Review URL: https://codereview.chromium.org/11367047 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@166538 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools')
-rwxr-xr-xtools/sharding_supervisor/sharding_supervisor.py89
-rw-r--r--tools/sharding_supervisor/stdio_buffer.py60
2 files changed, 20 insertions, 129 deletions
diff --git a/tools/sharding_supervisor/sharding_supervisor.py b/tools/sharding_supervisor/sharding_supervisor.py
index 89d3ec5..94a0d78 100755
--- a/tools/sharding_supervisor/sharding_supervisor.py
+++ b/tools/sharding_supervisor/sharding_supervisor.py
@@ -24,20 +24,14 @@ import re
import sys
import threading
-from stdio_buffer import StdioBuffer
from xml.dom import minidom
# Add tools/ to path
BASE_PATH = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(BASE_PATH, ".."))
-try:
- import find_depot_tools
- # Fixes a bug in Windows where some shards die upon starting
- # TODO(charleslee): actually fix this bug
- import subprocess2 as subprocess
-except ImportError:
- # Unable to find depot_tools, so just use standard subprocess
- import subprocess
+
+import find_depot_tools
+import subprocess2 as subprocess
SS_USAGE = "python %prog [options] path/to/test [gtest_args]"
SS_DEFAULT_NUM_CORES = 4
@@ -186,6 +180,7 @@ def RunShard(test, total_shards, index, gtest_args, stdout, stderr):
return subprocess.Popen(
args, stdout=stdout,
stderr=stderr,
+ shell=False,
env=env,
bufsize=0,
universal_newlines=True)
@@ -253,24 +248,14 @@ class ShardRunner(threading.Thread):
index = self.counter.get_nowait()
except Queue.Empty:
break
- shard_running = True
shard = RunShard(
self.supervisor.test, self.supervisor.total_shards, index,
- self.supervisor.gtest_args, subprocess.PIPE, subprocess.PIPE)
- buffer = StdioBuffer(shard)
- # Spawn two threads to collect stdio output
- stdout_collector_thread = buffer.handle_pipe(sys.stdout, shard.stdout)
- stderr_collector_thread = buffer.handle_pipe(sys.stderr, shard.stderr)
- while shard_running:
- pipe, line = buffer.readline()
- if pipe is None and line is None:
- shard_running = False
- if not line and not shard_running:
- break
+ self.supervisor.gtest_args, subprocess.PIPE, subprocess.STDOUT)
+ stdout, _ = shard.communicate(timeout=self.supervisor.timeout)
+ return_code = shard.poll()
+ for line in stdout.splitlines(True):
self.ProcessLine(index, line)
- self.supervisor.LogOutputLine(index, line, pipe)
- stdout_collector_thread.join()
- stderr_collector_thread.join()
+ self.supervisor.LogOutputLine(index, line, None)
if self.current_test:
self.ReportFailure("INCOMPLETE", index, self.current_test)
self.supervisor.ShardIndexCompleted(index)
@@ -286,8 +271,6 @@ class ShardingSupervisor(object):
num_shards_to_run: Total number of shards to split the test into.
num_runs: Total number of worker threads to create for running shards.
color: Indicates which coloring mode to use in the output.
- original_order: True if shard output should be printed as it comes.
- prefix: True if each line should indicate the shard index.
retry_percent: Integer specifying the max percent of tests to retry.
gtest_args: The options to pass to gtest.
failed_tests: List of statements from shard output indicating a failure.
@@ -312,9 +295,8 @@ class ShardingSupervisor(object):
SHARD_COMPLETED = object()
- def __init__(self, test, num_shards_to_run, num_runs, color, original_order,
- prefix, retry_percent, timeout, total_slaves, slave_index,
- gtest_args):
+ def __init__(self, test, num_shards_to_run, num_runs, color,
+ retry_percent, timeout, total_slaves, slave_index, gtest_args):
"""Inits ShardingSupervisor with given options and gtest arguments."""
self.test = test
# Number of shards to run locally.
@@ -324,8 +306,6 @@ class ShardingSupervisor(object):
self.slave_index = slave_index
self.num_runs = num_runs
self.color = color
- self.original_order = original_order
- self.prefix = prefix
self.retry_percent = retry_percent
self.timeout = timeout
self.gtest_args = gtest_args
@@ -377,11 +357,7 @@ class ShardingSupervisor(object):
self, counter, test_start, test_ok, test_fail)
worker.start()
workers.append(worker)
- if self.original_order:
- for worker in workers:
- worker.join()
- else:
- self.WaitForShards()
+ self.WaitForShards()
# All the shards are done. Merge all the XML files and generate the
# main one.
@@ -448,8 +424,8 @@ class ShardingSupervisor(object):
break
sys.stdout.write(line)
except:
- sys.stdout.flush()
print 'CAUGHT EXCEPTION: dumping remaining data:'
+ sys.stdout.flush()
for shard_index in range(self.num_shards_to_run):
while True:
try:
@@ -461,21 +437,17 @@ class ShardingSupervisor(object):
if line is self.SHARD_COMPLETED:
break
sys.stdout.write(line)
+ sys.stdout.flush()
raise
def LogOutputLine(self, index, line, pipe=sys.stdout):
"""Either prints the shard output line immediately or saves it in the
- output buffer, depending on the settings. Also optionally adds a prefix.
- Adds a (sys.stdout, line) or (sys.stderr, line) tuple in the output queue.
+ output buffer, depending on the settings. Adds a (sys.stdout, line)
+ or (sys.stderr, line) tuple in the output queue.
"""
# Fix up the index.
array_index = index - (self.num_shards_to_run * self.slave_index)
- if self.prefix:
- line = "%i>%s" % (index, line)
- if self.original_order:
- pipe.write(line)
- else:
- self.shard_output[array_index].put((pipe, line))
+ self.shard_output[array_index].put((pipe, line))
def IncrementTestCount(self):
"""Increments the number of tests run. This is relevant to the
@@ -562,19 +534,6 @@ def main():
parser.add_option(
"-s", "--runshard", type="int", help="single shard index to run")
parser.add_option(
- "--reorder", action="store_true",
- help="ensure that all output from an earlier shard is printed before"
- " output from a later shard")
- # TODO(charleslee): for backwards compatibility with master.cfg file
- parser.add_option(
- "--original-order", action="store_true",
- help="print shard output in its orginal jumbled order of execution"
- " (useful for debugging flaky tests)")
- parser.add_option(
- "--prefix", action="store_true",
- help="prefix each line of shard output with 'N>', where N is the shard"
- " index (forced True when --original-order is True)")
- parser.add_option(
"--random-seed", action="store_true",
help="shuffle the tests with a random seed value")
parser.add_option(
@@ -617,14 +576,6 @@ def main():
gtest_args = ["--gtest_color=%s" % {
True: "yes", False: "no"}[options.color]] + args[1:]
- if options.original_order:
- options.prefix = True
-
- # TODO(charleslee): for backwards compatibility with buildbot's log_parser
- if options.reorder:
- options.original_order = False
- options.prefix = True
-
if options.random_seed:
seed = random.randint(1, 99999)
gtest_args.extend(["--gtest_shuffle", "--gtest_random_seed=%i" % seed])
@@ -641,7 +592,7 @@ def main():
parser.error("Invalid shard number given parameters!")
shard = RunShard(
test, num_shards_to_run, options.runshard, gtest_args, None, None)
- shard.communicate()
+ shard.communicate(timeout=self.timeout)
return shard.poll()
# When running browser_tests, load the test binary into memory before running
@@ -661,8 +612,8 @@ def main():
# shard and run the whole test
ss = ShardingSupervisor(
test, num_shards_to_run, num_runs, options.color,
- options.original_order, options.prefix, options.retry_percent,
- options.timeout, options.total_slaves, options.slave_index, gtest_args)
+ options.retry_percent, options.timeout, options.total_slaves,
+ options.slave_index, gtest_args)
return ss.ShardTest()
diff --git a/tools/sharding_supervisor/stdio_buffer.py b/tools/sharding_supervisor/stdio_buffer.py
deleted file mode 100644
index 737374d..0000000
--- a/tools/sharding_supervisor/stdio_buffer.py
+++ /dev/null
@@ -1,60 +0,0 @@
-# Copyright (c) 2012 The Chromium Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-"""Syncronized Standard IO Linebuffer implemented with cStringIO."""
-
-import cStringIO
-import os
-import sys
-import threading
-import Queue
-
-
-class StdioBuffer(object):
- def __init__(self, shard):
- self.line_ready_event = threading.Event()
- self.queue = Queue.Queue()
- self.lock = threading.Lock()
- self.completed = 0
- self.shard = shard
-
- def _pipe_handler(self, system_pipe, program_pipe):
- """Helper method for collecting stdio output. Output is collected until
- a newline is seen, at which point an event is triggered and the line is
- pushed to a buffer as a (stdio, line) tuple."""
- buffer = cStringIO.StringIO()
- pipe_running = True
- while pipe_running:
- char = program_pipe.read(1)
- if not char and self.shard.poll() is not None:
- pipe_running = False
- self.line_ready_event.set()
- buffer.write(char)
- if char == '\n' or not pipe_running:
- line = buffer.getvalue()
- if not line and not pipe_running:
- with self.lock:
- self.completed += 1
- self.line_ready_event.set()
- break
- self.queue.put((system_pipe, line))
- self.line_ready_event.set()
- buffer.close()
- buffer = cStringIO.StringIO()
-
- def handle_pipe(self, system_pipe, program_pipe):
- t = threading.Thread(target=self._pipe_handler, args=[system_pipe,
- program_pipe])
- t.start()
- return t
-
- def readline(self):
- """Emits a tuple of (sys.stderr, line) or (sys.stdout, line). This is a
- blocking call."""
- while self.completed < 2 and self.queue.empty():
- self.line_ready_event.wait()
- self.line_ready_event.clear()
- if not self.queue.empty():
- return self.queue.get_nowait()
- else:
- return (None, None) \ No newline at end of file