summaryrefslogtreecommitdiffstats
path: root/tools
diff options
context:
space:
mode:
authorphajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-11-08 00:03:37 +0000
committerphajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-11-08 00:03:37 +0000
commiteddb05283c16ca117e86d1d356570b6fbb1d992f (patch)
tree71bfa9ff438ad28100bfb587327b564dfdcaf418 /tools
parentbe695a8499d69da452d530ebdba8f4a0a02694a5 (diff)
downloadchromium_src-eddb05283c16ca117e86d1d356570b6fbb1d992f.zip
chromium_src-eddb05283c16ca117e86d1d356570b6fbb1d992f.tar.gz
chromium_src-eddb05283c16ca117e86d1d356570b6fbb1d992f.tar.bz2
Revert "GTTF: remove parts of sharding_supervisor that might be hang-prone."
This reverts commit 245def1fd6e955d439fb5b79f4a4fd7f025ed5c6. BUG= Review URL: https://codereview.chromium.org/11275204 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@166554 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools')
-rwxr-xr-xtools/sharding_supervisor/sharding_supervisor.py89
-rw-r--r--tools/sharding_supervisor/stdio_buffer.py60
2 files changed, 129 insertions, 20 deletions
diff --git a/tools/sharding_supervisor/sharding_supervisor.py b/tools/sharding_supervisor/sharding_supervisor.py
index 94a0d78..89d3ec5 100755
--- a/tools/sharding_supervisor/sharding_supervisor.py
+++ b/tools/sharding_supervisor/sharding_supervisor.py
@@ -24,14 +24,20 @@ import re
import sys
import threading
+from stdio_buffer import StdioBuffer
from xml.dom import minidom
# Add tools/ to path
BASE_PATH = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(BASE_PATH, ".."))
-
-import find_depot_tools
-import subprocess2 as subprocess
+try:
+ import find_depot_tools
+ # Fixes a bug in Windows where some shards die upon starting
+ # TODO(charleslee): actually fix this bug
+ import subprocess2 as subprocess
+except ImportError:
+ # Unable to find depot_tools, so just use standard subprocess
+ import subprocess
SS_USAGE = "python %prog [options] path/to/test [gtest_args]"
SS_DEFAULT_NUM_CORES = 4
@@ -180,7 +186,6 @@ def RunShard(test, total_shards, index, gtest_args, stdout, stderr):
return subprocess.Popen(
args, stdout=stdout,
stderr=stderr,
- shell=False,
env=env,
bufsize=0,
universal_newlines=True)
@@ -248,14 +253,24 @@ class ShardRunner(threading.Thread):
index = self.counter.get_nowait()
except Queue.Empty:
break
+ shard_running = True
shard = RunShard(
self.supervisor.test, self.supervisor.total_shards, index,
- self.supervisor.gtest_args, subprocess.PIPE, subprocess.STDOUT)
- stdout, _ = shard.communicate(timeout=self.supervisor.timeout)
- return_code = shard.poll()
- for line in stdout.splitlines(True):
+ self.supervisor.gtest_args, subprocess.PIPE, subprocess.PIPE)
+ buffer = StdioBuffer(shard)
+ # Spawn two threads to collect stdio output
+ stdout_collector_thread = buffer.handle_pipe(sys.stdout, shard.stdout)
+ stderr_collector_thread = buffer.handle_pipe(sys.stderr, shard.stderr)
+ while shard_running:
+ pipe, line = buffer.readline()
+ if pipe is None and line is None:
+ shard_running = False
+ if not line and not shard_running:
+ break
self.ProcessLine(index, line)
- self.supervisor.LogOutputLine(index, line, None)
+ self.supervisor.LogOutputLine(index, line, pipe)
+ stdout_collector_thread.join()
+ stderr_collector_thread.join()
if self.current_test:
self.ReportFailure("INCOMPLETE", index, self.current_test)
self.supervisor.ShardIndexCompleted(index)
@@ -271,6 +286,8 @@ class ShardingSupervisor(object):
num_shards_to_run: Total number of shards to split the test into.
num_runs: Total number of worker threads to create for running shards.
color: Indicates which coloring mode to use in the output.
+ original_order: True if shard output should be printed as it comes.
+ prefix: True if each line should indicate the shard index.
retry_percent: Integer specifying the max percent of tests to retry.
gtest_args: The options to pass to gtest.
failed_tests: List of statements from shard output indicating a failure.
@@ -295,8 +312,9 @@ class ShardingSupervisor(object):
SHARD_COMPLETED = object()
- def __init__(self, test, num_shards_to_run, num_runs, color,
- retry_percent, timeout, total_slaves, slave_index, gtest_args):
+ def __init__(self, test, num_shards_to_run, num_runs, color, original_order,
+ prefix, retry_percent, timeout, total_slaves, slave_index,
+ gtest_args):
"""Inits ShardingSupervisor with given options and gtest arguments."""
self.test = test
# Number of shards to run locally.
@@ -306,6 +324,8 @@ class ShardingSupervisor(object):
self.slave_index = slave_index
self.num_runs = num_runs
self.color = color
+ self.original_order = original_order
+ self.prefix = prefix
self.retry_percent = retry_percent
self.timeout = timeout
self.gtest_args = gtest_args
@@ -357,7 +377,11 @@ class ShardingSupervisor(object):
self, counter, test_start, test_ok, test_fail)
worker.start()
workers.append(worker)
- self.WaitForShards()
+ if self.original_order:
+ for worker in workers:
+ worker.join()
+ else:
+ self.WaitForShards()
# All the shards are done. Merge all the XML files and generate the
# main one.
@@ -424,8 +448,8 @@ class ShardingSupervisor(object):
break
sys.stdout.write(line)
except:
- print 'CAUGHT EXCEPTION: dumping remaining data:'
sys.stdout.flush()
+ print 'CAUGHT EXCEPTION: dumping remaining data:'
for shard_index in range(self.num_shards_to_run):
while True:
try:
@@ -437,17 +461,21 @@ class ShardingSupervisor(object):
if line is self.SHARD_COMPLETED:
break
sys.stdout.write(line)
- sys.stdout.flush()
raise
def LogOutputLine(self, index, line, pipe=sys.stdout):
"""Either prints the shard output line immediately or saves it in the
- output buffer, depending on the settings. Adds a (sys.stdout, line)
- or (sys.stderr, line) tuple in the output queue.
+ output buffer, depending on the settings. Also optionally adds a prefix.
+ Adds a (sys.stdout, line) or (sys.stderr, line) tuple in the output queue.
"""
# Fix up the index.
array_index = index - (self.num_shards_to_run * self.slave_index)
- self.shard_output[array_index].put((pipe, line))
+ if self.prefix:
+ line = "%i>%s" % (index, line)
+ if self.original_order:
+ pipe.write(line)
+ else:
+ self.shard_output[array_index].put((pipe, line))
def IncrementTestCount(self):
"""Increments the number of tests run. This is relevant to the
@@ -534,6 +562,19 @@ def main():
parser.add_option(
"-s", "--runshard", type="int", help="single shard index to run")
parser.add_option(
+ "--reorder", action="store_true",
+ help="ensure that all output from an earlier shard is printed before"
+ " output from a later shard")
+ # TODO(charleslee): for backwards compatibility with master.cfg file
+ parser.add_option(
+ "--original-order", action="store_true",
+ help="print shard output in its orginal jumbled order of execution"
+ " (useful for debugging flaky tests)")
+ parser.add_option(
+ "--prefix", action="store_true",
+ help="prefix each line of shard output with 'N>', where N is the shard"
+ " index (forced True when --original-order is True)")
+ parser.add_option(
"--random-seed", action="store_true",
help="shuffle the tests with a random seed value")
parser.add_option(
@@ -576,6 +617,14 @@ def main():
gtest_args = ["--gtest_color=%s" % {
True: "yes", False: "no"}[options.color]] + args[1:]
+ if options.original_order:
+ options.prefix = True
+
+ # TODO(charleslee): for backwards compatibility with buildbot's log_parser
+ if options.reorder:
+ options.original_order = False
+ options.prefix = True
+
if options.random_seed:
seed = random.randint(1, 99999)
gtest_args.extend(["--gtest_shuffle", "--gtest_random_seed=%i" % seed])
@@ -592,7 +641,7 @@ def main():
parser.error("Invalid shard number given parameters!")
shard = RunShard(
test, num_shards_to_run, options.runshard, gtest_args, None, None)
- shard.communicate(timeout=self.timeout)
+ shard.communicate()
return shard.poll()
# When running browser_tests, load the test binary into memory before running
@@ -612,8 +661,8 @@ def main():
# shard and run the whole test
ss = ShardingSupervisor(
test, num_shards_to_run, num_runs, options.color,
- options.retry_percent, options.timeout, options.total_slaves,
- options.slave_index, gtest_args)
+ options.original_order, options.prefix, options.retry_percent,
+ options.timeout, options.total_slaves, options.slave_index, gtest_args)
return ss.ShardTest()
diff --git a/tools/sharding_supervisor/stdio_buffer.py b/tools/sharding_supervisor/stdio_buffer.py
new file mode 100644
index 0000000..737374d
--- /dev/null
+++ b/tools/sharding_supervisor/stdio_buffer.py
@@ -0,0 +1,60 @@
+# Copyright (c) 2012 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Syncronized Standard IO Linebuffer implemented with cStringIO."""
+
+import cStringIO
+import os
+import sys
+import threading
+import Queue
+
+
+class StdioBuffer(object):
+ def __init__(self, shard):
+ self.line_ready_event = threading.Event()
+ self.queue = Queue.Queue()
+ self.lock = threading.Lock()
+ self.completed = 0
+ self.shard = shard
+
+ def _pipe_handler(self, system_pipe, program_pipe):
+ """Helper method for collecting stdio output. Output is collected until
+ a newline is seen, at which point an event is triggered and the line is
+ pushed to a buffer as a (stdio, line) tuple."""
+ buffer = cStringIO.StringIO()
+ pipe_running = True
+ while pipe_running:
+ char = program_pipe.read(1)
+ if not char and self.shard.poll() is not None:
+ pipe_running = False
+ self.line_ready_event.set()
+ buffer.write(char)
+ if char == '\n' or not pipe_running:
+ line = buffer.getvalue()
+ if not line and not pipe_running:
+ with self.lock:
+ self.completed += 1
+ self.line_ready_event.set()
+ break
+ self.queue.put((system_pipe, line))
+ self.line_ready_event.set()
+ buffer.close()
+ buffer = cStringIO.StringIO()
+
+ def handle_pipe(self, system_pipe, program_pipe):
+ t = threading.Thread(target=self._pipe_handler, args=[system_pipe,
+ program_pipe])
+ t.start()
+ return t
+
+ def readline(self):
+ """Emits a tuple of (sys.stderr, line) or (sys.stdout, line). This is a
+ blocking call."""
+ while self.completed < 2 and self.queue.empty():
+ self.line_ready_event.wait()
+ self.line_ready_event.clear()
+ if not self.queue.empty():
+ return self.queue.get_nowait()
+ else:
+ return (None, None) \ No newline at end of file