summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xtools/sharding_supervisor/sharding_supervisor.py44
-rwxr-xr-xtools/sharding_supervisor/sharding_supervisor_unittest.py15
-rw-r--r--tools/sharding_supervisor/stdio_buffer.py60
3 files changed, 92 insertions, 27 deletions
diff --git a/tools/sharding_supervisor/sharding_supervisor.py b/tools/sharding_supervisor/sharding_supervisor.py
index 83f860c..89d3ec5 100755
--- a/tools/sharding_supervisor/sharding_supervisor.py
+++ b/tools/sharding_supervisor/sharding_supervisor.py
@@ -24,6 +24,7 @@ import re
import sys
import threading
+from stdio_buffer import StdioBuffer
from xml.dom import minidom
# Add tools/ to path
@@ -252,24 +253,24 @@ class ShardRunner(threading.Thread):
index = self.counter.get_nowait()
except Queue.Empty:
break
- chars = cStringIO.StringIO()
shard_running = True
shard = RunShard(
self.supervisor.test, self.supervisor.total_shards, index,
- self.supervisor.gtest_args, subprocess.PIPE, subprocess.STDOUT)
+ self.supervisor.gtest_args, subprocess.PIPE, subprocess.PIPE)
+ buffer = StdioBuffer(shard)
+ # Spawn two threads to collect stdio output
+ stdout_collector_thread = buffer.handle_pipe(sys.stdout, shard.stdout)
+ stderr_collector_thread = buffer.handle_pipe(sys.stderr, shard.stderr)
while shard_running:
- char = shard.stdout.read(1)
- if not char and shard.poll() is not None:
+ pipe, line = buffer.readline()
+ if pipe is None and line is None:
shard_running = False
- chars.write(char)
- if char == "\n" or not shard_running:
- line = chars.getvalue()
- if not line and not shard_running:
- break
- self.ProcessLine(index, line)
- self.supervisor.LogOutputLine(index, line)
- chars.close()
- chars = cStringIO.StringIO()
+ if not line and not shard_running:
+ break
+ self.ProcessLine(index, line)
+ self.supervisor.LogOutputLine(index, line, pipe)
+ stdout_collector_thread.join()
+ stderr_collector_thread.join()
if self.current_test:
self.ReportFailure("INCOMPLETE", index, self.current_test)
self.supervisor.ShardIndexCompleted(index)
@@ -292,7 +293,7 @@ class ShardingSupervisor(object):
failed_tests: List of statements from shard output indicating a failure.
failed_shards: List of shards that contained failing tests.
shards_completed: List of flags indicating which shards have finished.
- shard_output: Buffer that stores the output from each shard.
+ shard_output: Buffer that stores output from each shard as (stdio, line).
test_counter: Stores the total number of tests run.
total_slaves: Total number of slaves running this test.
slave_index: Current slave to run tests for.
@@ -431,7 +432,7 @@ class ShardingSupervisor(object):
for shard_index in range(self.num_shards_to_run):
while True:
try:
- line = self.shard_output[shard_index].get(True, self.timeout)
+ pipe, line = self.shard_output[shard_index].get(True, self.timeout)
except Queue.Empty:
# Shard timed out, notice failure and move on.
self.LogShardFailure(shard_index)
@@ -452,7 +453,7 @@ class ShardingSupervisor(object):
for shard_index in range(self.num_shards_to_run):
while True:
try:
- line = self.shard_output[shard_index].get(False)
+ pipe, line = self.shard_output[shard_index].get(False)
except Queue.Empty:
# Shard timed out, notice failure and move on.
self.LogShardFailure(shard_index)
@@ -462,18 +463,19 @@ class ShardingSupervisor(object):
sys.stdout.write(line)
raise
- def LogOutputLine(self, index, line):
+ def LogOutputLine(self, index, line, pipe=sys.stdout):
"""Either prints the shard output line immediately or saves it in the
- output buffer, depending on the settings. Also optionally adds a prefix.
+ output buffer, depending on the settings. Also optionally adds a prefix.
+ Adds a (sys.stdout, line) or (sys.stderr, line) tuple in the output queue.
"""
# Fix up the index.
array_index = index - (self.num_shards_to_run * self.slave_index)
if self.prefix:
line = "%i>%s" % (index, line)
if self.original_order:
- sys.stdout.write(line)
+ pipe.write(line)
else:
- self.shard_output[array_index].put(line)
+ self.shard_output[array_index].put((pipe, line))
def IncrementTestCount(self):
"""Increments the number of tests run. This is relevant to the
@@ -487,7 +489,7 @@ class ShardingSupervisor(object):
"""
# Fix up the index.
array_index = index - (self.num_shards_to_run * self.slave_index)
- self.shard_output[array_index].put(self.SHARD_COMPLETED)
+ self.shard_output[array_index].put((sys.stdout, self.SHARD_COMPLETED))
def RetryFailedTests(self):
"""Reruns any failed tests serially and prints another summary of the
diff --git a/tools/sharding_supervisor/sharding_supervisor_unittest.py b/tools/sharding_supervisor/sharding_supervisor_unittest.py
index b6e9ca5..77b1f54 100755
--- a/tools/sharding_supervisor/sharding_supervisor_unittest.py
+++ b/tools/sharding_supervisor/sharding_supervisor_unittest.py
@@ -27,8 +27,10 @@ def generate_expected_output(start, end, num_shards):
stdout = ''
stderr = ''
for i in range(start, end):
- stdout += 'Running shard %d of %d\n' % (i, num_shards)
- stdout += '\nALL SHARDS PASSED!\nALL TESTS PASSED!\n'
+ stdout += 'Running shard %d of %d%s' % (i, num_shards, os.linesep)
+ stdout += '%sALL SHARDS PASSED!%sALL TESTS PASSED!%s' % (os.linesep,
+ os.linesep,
+ os.linesep)
return (stdout, stderr)
@@ -39,8 +41,9 @@ class ShardingSupervisorUnittest(unittest.TestCase):
expected_shards = NUM_CORES * SHARDS_PER_CORE
(expected_out, expected_err) = generate_expected_output(
0, expected_shards, expected_shards)
- p = subprocess.Popen([SHARDING_SUPERVISOR, '--no-color', DUMMY_TEST],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ p = subprocess.Popen([sys.executable, SHARDING_SUPERVISOR, '--no-color',
+ DUMMY_TEST], stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
(out, err) = p.communicate()
self.assertEqual(expected_out, out)
@@ -52,7 +55,7 @@ class ShardingSupervisorUnittest(unittest.TestCase):
expected_shards = NUM_CORES * 25
(expected_out, expected_err) = generate_expected_output(
0, expected_shards, expected_shards)
- p = subprocess.Popen([SHARDING_SUPERVISOR, '--no-color',
+ p = subprocess.Popen([sys.executable, SHARDING_SUPERVISOR, '--no-color',
'--shards_per_core', '25', DUMMY_TEST],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@@ -72,7 +75,7 @@ class ShardingSupervisorUnittest(unittest.TestCase):
end = begin + NUM_CORES * SHARDS_PER_CORE
(expected_out, expected_err) = generate_expected_output(
begin, end, expected_shards)
- p = subprocess.Popen([SHARDING_SUPERVISOR, '--no-color',
+ p = subprocess.Popen([sys.executable, SHARDING_SUPERVISOR, '--no-color',
'--total-slaves', str(total_shards),
'--slave-index', str(index),
DUMMY_TEST],
diff --git a/tools/sharding_supervisor/stdio_buffer.py b/tools/sharding_supervisor/stdio_buffer.py
new file mode 100644
index 0000000..737374d
--- /dev/null
+++ b/tools/sharding_supervisor/stdio_buffer.py
@@ -0,0 +1,60 @@
+# Copyright (c) 2012 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Syncronized Standard IO Linebuffer implemented with cStringIO."""
+
+import cStringIO
+import os
+import sys
+import threading
+import Queue
+
+
+class StdioBuffer(object):
+ def __init__(self, shard):
+ self.line_ready_event = threading.Event()
+ self.queue = Queue.Queue()
+ self.lock = threading.Lock()
+ self.completed = 0
+ self.shard = shard
+
+ def _pipe_handler(self, system_pipe, program_pipe):
+ """Helper method for collecting stdio output. Output is collected until
+ a newline is seen, at which point an event is triggered and the line is
+ pushed to a buffer as a (stdio, line) tuple."""
+ buffer = cStringIO.StringIO()
+ pipe_running = True
+ while pipe_running:
+ char = program_pipe.read(1)
+ if not char and self.shard.poll() is not None:
+ pipe_running = False
+ self.line_ready_event.set()
+ buffer.write(char)
+ if char == '\n' or not pipe_running:
+ line = buffer.getvalue()
+ if not line and not pipe_running:
+ with self.lock:
+ self.completed += 1
+ self.line_ready_event.set()
+ break
+ self.queue.put((system_pipe, line))
+ self.line_ready_event.set()
+ buffer.close()
+ buffer = cStringIO.StringIO()
+
+ def handle_pipe(self, system_pipe, program_pipe):
+ t = threading.Thread(target=self._pipe_handler, args=[system_pipe,
+ program_pipe])
+ t.start()
+ return t
+
+ def readline(self):
+ """Emits a tuple of (sys.stderr, line) or (sys.stdout, line). This is a
+ blocking call."""
+ while self.completed < 2 and self.queue.empty():
+ self.line_ready_event.wait()
+ self.line_ready_event.clear()
+ if not self.queue.empty():
+ return self.queue.get_nowait()
+ else:
+ return (None, None) \ No newline at end of file