summaryrefslogtreecommitdiffstats
path: root/tools
diff options
context:
space:
mode:
authorhinoka@chromium.org <hinoka@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-09-17 20:41:49 +0000
committerhinoka@chromium.org <hinoka@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-09-17 20:41:49 +0000
commit396ec8c8601319073c0f9b7fb95bf44c4a4fa6f4 (patch)
tree964605853dfaf944418c4e33b64d999e7d0fa336 /tools
parent8c1872cec012d029d6c5489707152ebcf8c32929 (diff)
downloadchromium_src-396ec8c8601319073c0f9b7fb95bf44c4a4fa6f4.zip
chromium_src-396ec8c8601319073c0f9b7fb95bf44c4a4fa6f4.tar.gz
chromium_src-396ec8c8601319073c0f9b7fb95bf44c4a4fa6f4.tar.bz2
Sharding supervisor changes
* Fixed tests in windows * Split up stdout and stderr so they print separately Its not clear whether or not this'll fix http://code.google.com/p/chromium/issues/detail?id=145308&can=3&colspec=ID%20Pri%20Mstone%20ReleaseBlock%20OS%20Area%20Feature%20Status%20Owner%20Summary But the goal is to quarantine stderr from mixing in with the stdout pipe, so this code uses two extra threads per shard to collect stdout and stderr byte by byte, and emit them line by line. Review URL: https://chromiumcodereview.appspot.com/10919228 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@157184 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools')
-rwxr-xr-xtools/sharding_supervisor/sharding_supervisor.py44
-rwxr-xr-xtools/sharding_supervisor/sharding_supervisor_unittest.py15
-rw-r--r--tools/sharding_supervisor/stdio_buffer.py60
3 files changed, 92 insertions, 27 deletions
diff --git a/tools/sharding_supervisor/sharding_supervisor.py b/tools/sharding_supervisor/sharding_supervisor.py
index 83f860c..89d3ec5 100755
--- a/tools/sharding_supervisor/sharding_supervisor.py
+++ b/tools/sharding_supervisor/sharding_supervisor.py
@@ -24,6 +24,7 @@ import re
import sys
import threading
+from stdio_buffer import StdioBuffer
from xml.dom import minidom
# Add tools/ to path
@@ -252,24 +253,24 @@ class ShardRunner(threading.Thread):
index = self.counter.get_nowait()
except Queue.Empty:
break
- chars = cStringIO.StringIO()
shard_running = True
shard = RunShard(
self.supervisor.test, self.supervisor.total_shards, index,
- self.supervisor.gtest_args, subprocess.PIPE, subprocess.STDOUT)
+ self.supervisor.gtest_args, subprocess.PIPE, subprocess.PIPE)
+ buffer = StdioBuffer(shard)
+ # Spawn two threads to collect stdio output
+ stdout_collector_thread = buffer.handle_pipe(sys.stdout, shard.stdout)
+ stderr_collector_thread = buffer.handle_pipe(sys.stderr, shard.stderr)
while shard_running:
- char = shard.stdout.read(1)
- if not char and shard.poll() is not None:
+ pipe, line = buffer.readline()
+ if pipe is None and line is None:
shard_running = False
- chars.write(char)
- if char == "\n" or not shard_running:
- line = chars.getvalue()
- if not line and not shard_running:
- break
- self.ProcessLine(index, line)
- self.supervisor.LogOutputLine(index, line)
- chars.close()
- chars = cStringIO.StringIO()
+ if not line and not shard_running:
+ break
+ self.ProcessLine(index, line)
+ self.supervisor.LogOutputLine(index, line, pipe)
+ stdout_collector_thread.join()
+ stderr_collector_thread.join()
if self.current_test:
self.ReportFailure("INCOMPLETE", index, self.current_test)
self.supervisor.ShardIndexCompleted(index)
@@ -292,7 +293,7 @@ class ShardingSupervisor(object):
failed_tests: List of statements from shard output indicating a failure.
failed_shards: List of shards that contained failing tests.
shards_completed: List of flags indicating which shards have finished.
- shard_output: Buffer that stores the output from each shard.
+ shard_output: Buffer that stores output from each shard as (stdio, line).
test_counter: Stores the total number of tests run.
total_slaves: Total number of slaves running this test.
slave_index: Current slave to run tests for.
@@ -431,7 +432,7 @@ class ShardingSupervisor(object):
for shard_index in range(self.num_shards_to_run):
while True:
try:
- line = self.shard_output[shard_index].get(True, self.timeout)
+ pipe, line = self.shard_output[shard_index].get(True, self.timeout)
except Queue.Empty:
# Shard timed out, notice failure and move on.
self.LogShardFailure(shard_index)
@@ -452,7 +453,7 @@ class ShardingSupervisor(object):
for shard_index in range(self.num_shards_to_run):
while True:
try:
- line = self.shard_output[shard_index].get(False)
+ pipe, line = self.shard_output[shard_index].get(False)
except Queue.Empty:
# Shard timed out, notice failure and move on.
self.LogShardFailure(shard_index)
@@ -462,18 +463,19 @@ class ShardingSupervisor(object):
sys.stdout.write(line)
raise
- def LogOutputLine(self, index, line):
+ def LogOutputLine(self, index, line, pipe=sys.stdout):
"""Either prints the shard output line immediately or saves it in the
- output buffer, depending on the settings. Also optionally adds a prefix.
+ output buffer, depending on the settings. Also optionally adds a prefix.
+ Adds a (sys.stdout, line) or (sys.stderr, line) tuple in the output queue.
"""
# Fix up the index.
array_index = index - (self.num_shards_to_run * self.slave_index)
if self.prefix:
line = "%i>%s" % (index, line)
if self.original_order:
- sys.stdout.write(line)
+ pipe.write(line)
else:
- self.shard_output[array_index].put(line)
+ self.shard_output[array_index].put((pipe, line))
def IncrementTestCount(self):
"""Increments the number of tests run. This is relevant to the
@@ -487,7 +489,7 @@ class ShardingSupervisor(object):
"""
# Fix up the index.
array_index = index - (self.num_shards_to_run * self.slave_index)
- self.shard_output[array_index].put(self.SHARD_COMPLETED)
+ self.shard_output[array_index].put((sys.stdout, self.SHARD_COMPLETED))
def RetryFailedTests(self):
"""Reruns any failed tests serially and prints another summary of the
diff --git a/tools/sharding_supervisor/sharding_supervisor_unittest.py b/tools/sharding_supervisor/sharding_supervisor_unittest.py
index b6e9ca5..77b1f54 100755
--- a/tools/sharding_supervisor/sharding_supervisor_unittest.py
+++ b/tools/sharding_supervisor/sharding_supervisor_unittest.py
@@ -27,8 +27,10 @@ def generate_expected_output(start, end, num_shards):
stdout = ''
stderr = ''
for i in range(start, end):
- stdout += 'Running shard %d of %d\n' % (i, num_shards)
- stdout += '\nALL SHARDS PASSED!\nALL TESTS PASSED!\n'
+ stdout += 'Running shard %d of %d%s' % (i, num_shards, os.linesep)
+ stdout += '%sALL SHARDS PASSED!%sALL TESTS PASSED!%s' % (os.linesep,
+ os.linesep,
+ os.linesep)
return (stdout, stderr)
@@ -39,8 +41,9 @@ class ShardingSupervisorUnittest(unittest.TestCase):
expected_shards = NUM_CORES * SHARDS_PER_CORE
(expected_out, expected_err) = generate_expected_output(
0, expected_shards, expected_shards)
- p = subprocess.Popen([SHARDING_SUPERVISOR, '--no-color', DUMMY_TEST],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ p = subprocess.Popen([sys.executable, SHARDING_SUPERVISOR, '--no-color',
+ DUMMY_TEST], stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
(out, err) = p.communicate()
self.assertEqual(expected_out, out)
@@ -52,7 +55,7 @@ class ShardingSupervisorUnittest(unittest.TestCase):
expected_shards = NUM_CORES * 25
(expected_out, expected_err) = generate_expected_output(
0, expected_shards, expected_shards)
- p = subprocess.Popen([SHARDING_SUPERVISOR, '--no-color',
+ p = subprocess.Popen([sys.executable, SHARDING_SUPERVISOR, '--no-color',
'--shards_per_core', '25', DUMMY_TEST],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@@ -72,7 +75,7 @@ class ShardingSupervisorUnittest(unittest.TestCase):
end = begin + NUM_CORES * SHARDS_PER_CORE
(expected_out, expected_err) = generate_expected_output(
begin, end, expected_shards)
- p = subprocess.Popen([SHARDING_SUPERVISOR, '--no-color',
+ p = subprocess.Popen([sys.executable, SHARDING_SUPERVISOR, '--no-color',
'--total-slaves', str(total_shards),
'--slave-index', str(index),
DUMMY_TEST],
diff --git a/tools/sharding_supervisor/stdio_buffer.py b/tools/sharding_supervisor/stdio_buffer.py
new file mode 100644
index 0000000..737374d
--- /dev/null
+++ b/tools/sharding_supervisor/stdio_buffer.py
@@ -0,0 +1,60 @@
+# Copyright (c) 2012 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Syncronized Standard IO Linebuffer implemented with cStringIO."""
+
+import cStringIO
+import os
+import sys
+import threading
+import Queue
+
+
+class StdioBuffer(object):
+ def __init__(self, shard):
+ self.line_ready_event = threading.Event()
+ self.queue = Queue.Queue()
+ self.lock = threading.Lock()
+ self.completed = 0
+ self.shard = shard
+
+ def _pipe_handler(self, system_pipe, program_pipe):
+ """Helper method for collecting stdio output. Output is collected until
+ a newline is seen, at which point an event is triggered and the line is
+ pushed to a buffer as a (stdio, line) tuple."""
+ buffer = cStringIO.StringIO()
+ pipe_running = True
+ while pipe_running:
+ char = program_pipe.read(1)
+ if not char and self.shard.poll() is not None:
+ pipe_running = False
+ self.line_ready_event.set()
+ buffer.write(char)
+ if char == '\n' or not pipe_running:
+ line = buffer.getvalue()
+ if not line and not pipe_running:
+ with self.lock:
+ self.completed += 1
+ self.line_ready_event.set()
+ break
+ self.queue.put((system_pipe, line))
+ self.line_ready_event.set()
+ buffer.close()
+ buffer = cStringIO.StringIO()
+
+ def handle_pipe(self, system_pipe, program_pipe):
+ t = threading.Thread(target=self._pipe_handler, args=[system_pipe,
+ program_pipe])
+ t.start()
+ return t
+
+ def readline(self):
+ """Emits a tuple of (sys.stderr, line) or (sys.stdout, line). This is a
+ blocking call."""
+ while self.completed < 2 and self.queue.empty():
+ self.line_ready_event.wait()
+ self.line_ready_event.clear()
+ if not self.queue.empty():
+ return self.queue.get_nowait()
+ else:
+ return (None, None) \ No newline at end of file