diff options
-rwxr-xr-x | tools/sharding_supervisor/sharding_supervisor.py | 44 | ||||
-rwxr-xr-x | tools/sharding_supervisor/sharding_supervisor_unittest.py | 15 | ||||
-rw-r--r-- | tools/sharding_supervisor/stdio_buffer.py | 60 |
3 files changed, 92 insertions, 27 deletions
diff --git a/tools/sharding_supervisor/sharding_supervisor.py b/tools/sharding_supervisor/sharding_supervisor.py index 83f860c..89d3ec5 100755 --- a/tools/sharding_supervisor/sharding_supervisor.py +++ b/tools/sharding_supervisor/sharding_supervisor.py @@ -24,6 +24,7 @@ import re import sys import threading +from stdio_buffer import StdioBuffer from xml.dom import minidom # Add tools/ to path @@ -252,24 +253,24 @@ class ShardRunner(threading.Thread): index = self.counter.get_nowait() except Queue.Empty: break - chars = cStringIO.StringIO() shard_running = True shard = RunShard( self.supervisor.test, self.supervisor.total_shards, index, - self.supervisor.gtest_args, subprocess.PIPE, subprocess.STDOUT) + self.supervisor.gtest_args, subprocess.PIPE, subprocess.PIPE) + buffer = StdioBuffer(shard) + # Spawn two threads to collect stdio output + stdout_collector_thread = buffer.handle_pipe(sys.stdout, shard.stdout) + stderr_collector_thread = buffer.handle_pipe(sys.stderr, shard.stderr) while shard_running: - char = shard.stdout.read(1) - if not char and shard.poll() is not None: + pipe, line = buffer.readline() + if pipe is None and line is None: shard_running = False - chars.write(char) - if char == "\n" or not shard_running: - line = chars.getvalue() - if not line and not shard_running: - break - self.ProcessLine(index, line) - self.supervisor.LogOutputLine(index, line) - chars.close() - chars = cStringIO.StringIO() + if not line and not shard_running: + break + self.ProcessLine(index, line) + self.supervisor.LogOutputLine(index, line, pipe) + stdout_collector_thread.join() + stderr_collector_thread.join() if self.current_test: self.ReportFailure("INCOMPLETE", index, self.current_test) self.supervisor.ShardIndexCompleted(index) @@ -292,7 +293,7 @@ class ShardingSupervisor(object): failed_tests: List of statements from shard output indicating a failure. failed_shards: List of shards that contained failing tests. shards_completed: List of flags indicating which shards have finished. - shard_output: Buffer that stores the output from each shard. + shard_output: Buffer that stores output from each shard as (stdio, line). test_counter: Stores the total number of tests run. total_slaves: Total number of slaves running this test. slave_index: Current slave to run tests for. @@ -431,7 +432,7 @@ class ShardingSupervisor(object): for shard_index in range(self.num_shards_to_run): while True: try: - line = self.shard_output[shard_index].get(True, self.timeout) + pipe, line = self.shard_output[shard_index].get(True, self.timeout) except Queue.Empty: # Shard timed out, notice failure and move on. self.LogShardFailure(shard_index) @@ -452,7 +453,7 @@ class ShardingSupervisor(object): for shard_index in range(self.num_shards_to_run): while True: try: - line = self.shard_output[shard_index].get(False) + pipe, line = self.shard_output[shard_index].get(False) except Queue.Empty: # Shard timed out, notice failure and move on. self.LogShardFailure(shard_index) @@ -462,18 +463,19 @@ class ShardingSupervisor(object): sys.stdout.write(line) raise - def LogOutputLine(self, index, line): + def LogOutputLine(self, index, line, pipe=sys.stdout): """Either prints the shard output line immediately or saves it in the - output buffer, depending on the settings. Also optionally adds a prefix. + output buffer, depending on the settings. Also optionally adds a prefix. + Adds a (sys.stdout, line) or (sys.stderr, line) tuple in the output queue. """ # Fix up the index. array_index = index - (self.num_shards_to_run * self.slave_index) if self.prefix: line = "%i>%s" % (index, line) if self.original_order: - sys.stdout.write(line) + pipe.write(line) else: - self.shard_output[array_index].put(line) + self.shard_output[array_index].put((pipe, line)) def IncrementTestCount(self): """Increments the number of tests run. This is relevant to the @@ -487,7 +489,7 @@ class ShardingSupervisor(object): """ # Fix up the index. array_index = index - (self.num_shards_to_run * self.slave_index) - self.shard_output[array_index].put(self.SHARD_COMPLETED) + self.shard_output[array_index].put((sys.stdout, self.SHARD_COMPLETED)) def RetryFailedTests(self): """Reruns any failed tests serially and prints another summary of the diff --git a/tools/sharding_supervisor/sharding_supervisor_unittest.py b/tools/sharding_supervisor/sharding_supervisor_unittest.py index b6e9ca5..77b1f54 100755 --- a/tools/sharding_supervisor/sharding_supervisor_unittest.py +++ b/tools/sharding_supervisor/sharding_supervisor_unittest.py @@ -27,8 +27,10 @@ def generate_expected_output(start, end, num_shards): stdout = '' stderr = '' for i in range(start, end): - stdout += 'Running shard %d of %d\n' % (i, num_shards) - stdout += '\nALL SHARDS PASSED!\nALL TESTS PASSED!\n' + stdout += 'Running shard %d of %d%s' % (i, num_shards, os.linesep) + stdout += '%sALL SHARDS PASSED!%sALL TESTS PASSED!%s' % (os.linesep, + os.linesep, + os.linesep) return (stdout, stderr) @@ -39,8 +41,9 @@ class ShardingSupervisorUnittest(unittest.TestCase): expected_shards = NUM_CORES * SHARDS_PER_CORE (expected_out, expected_err) = generate_expected_output( 0, expected_shards, expected_shards) - p = subprocess.Popen([SHARDING_SUPERVISOR, '--no-color', DUMMY_TEST], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p = subprocess.Popen([sys.executable, SHARDING_SUPERVISOR, '--no-color', + DUMMY_TEST], stdout=subprocess.PIPE, + stderr=subprocess.PIPE) (out, err) = p.communicate() self.assertEqual(expected_out, out) @@ -52,7 +55,7 @@ class ShardingSupervisorUnittest(unittest.TestCase): expected_shards = NUM_CORES * 25 (expected_out, expected_err) = generate_expected_output( 0, expected_shards, expected_shards) - p = subprocess.Popen([SHARDING_SUPERVISOR, '--no-color', + p = subprocess.Popen([sys.executable, SHARDING_SUPERVISOR, '--no-color', '--shards_per_core', '25', DUMMY_TEST], stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -72,7 +75,7 @@ class ShardingSupervisorUnittest(unittest.TestCase): end = begin + NUM_CORES * SHARDS_PER_CORE (expected_out, expected_err) = generate_expected_output( begin, end, expected_shards) - p = subprocess.Popen([SHARDING_SUPERVISOR, '--no-color', + p = subprocess.Popen([sys.executable, SHARDING_SUPERVISOR, '--no-color', '--total-slaves', str(total_shards), '--slave-index', str(index), DUMMY_TEST], diff --git a/tools/sharding_supervisor/stdio_buffer.py b/tools/sharding_supervisor/stdio_buffer.py new file mode 100644 index 0000000..737374d --- /dev/null +++ b/tools/sharding_supervisor/stdio_buffer.py @@ -0,0 +1,60 @@ +# Copyright (c) 2012 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +"""Syncronized Standard IO Linebuffer implemented with cStringIO.""" + +import cStringIO +import os +import sys +import threading +import Queue + + +class StdioBuffer(object): + def __init__(self, shard): + self.line_ready_event = threading.Event() + self.queue = Queue.Queue() + self.lock = threading.Lock() + self.completed = 0 + self.shard = shard + + def _pipe_handler(self, system_pipe, program_pipe): + """Helper method for collecting stdio output. Output is collected until + a newline is seen, at which point an event is triggered and the line is + pushed to a buffer as a (stdio, line) tuple.""" + buffer = cStringIO.StringIO() + pipe_running = True + while pipe_running: + char = program_pipe.read(1) + if not char and self.shard.poll() is not None: + pipe_running = False + self.line_ready_event.set() + buffer.write(char) + if char == '\n' or not pipe_running: + line = buffer.getvalue() + if not line and not pipe_running: + with self.lock: + self.completed += 1 + self.line_ready_event.set() + break + self.queue.put((system_pipe, line)) + self.line_ready_event.set() + buffer.close() + buffer = cStringIO.StringIO() + + def handle_pipe(self, system_pipe, program_pipe): + t = threading.Thread(target=self._pipe_handler, args=[system_pipe, + program_pipe]) + t.start() + return t + + def readline(self): + """Emits a tuple of (sys.stderr, line) or (sys.stdout, line). This is a + blocking call.""" + while self.completed < 2 and self.queue.empty(): + self.line_ready_event.wait() + self.line_ready_event.clear() + if not self.queue.empty(): + return self.queue.get_nowait() + else: + return (None, None)
\ No newline at end of file |