summaryrefslogtreecommitdiffstats
path: root/tools/sharding_supervisor/stdio_buffer.py
diff options
context:
space:
mode:
authorphajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-11-20 20:23:45 +0000
committerphajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-11-20 20:23:45 +0000
commit9af78cb2030b492a2ab5b766b3f803f7c6964a30 (patch)
treeb5380b6a3e9d6c6a44db0c3364af275782a3ca9c /tools/sharding_supervisor/stdio_buffer.py
parenta7e2e82121a8b07a685638f83dacd9e7c1d1602a (diff)
downloadchromium_src-9af78cb2030b492a2ab5b766b3f803f7c6964a30.zip
chromium_src-9af78cb2030b492a2ab5b766b3f803f7c6964a30.tar.gz
chromium_src-9af78cb2030b492a2ab5b766b3f803f7c6964a30.tar.bz2
Revert "GTTF: remove parts of sharding_supervisor that might be hang-prone."
This reverts commit 2aa17efbf65fda1aadc713376ecc4ea6239034d3. Revert "Decrease level of parallelism in sharding supervisor," This reverts commit f6aa2a06d514ec8c8777c541bcd5c69c71785fc3. Revert "Remove XML logic from sharding supervisor. It's broken:" This reverts commit 0c9c0eb95f30e5a2fb4b19b87a89d4e23224cfe5. BUG= Review URL: https://codereview.chromium.org/11411103 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@168856 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools/sharding_supervisor/stdio_buffer.py')
-rw-r--r--tools/sharding_supervisor/stdio_buffer.py60
1 files changed, 60 insertions, 0 deletions
diff --git a/tools/sharding_supervisor/stdio_buffer.py b/tools/sharding_supervisor/stdio_buffer.py
new file mode 100644
index 0000000..737374d
--- /dev/null
+++ b/tools/sharding_supervisor/stdio_buffer.py
@@ -0,0 +1,60 @@
+# Copyright (c) 2012 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Syncronized Standard IO Linebuffer implemented with cStringIO."""
+
+import cStringIO
+import os
+import sys
+import threading
+import Queue
+
+
+class StdioBuffer(object):
+ def __init__(self, shard):
+ self.line_ready_event = threading.Event()
+ self.queue = Queue.Queue()
+ self.lock = threading.Lock()
+ self.completed = 0
+ self.shard = shard
+
+ def _pipe_handler(self, system_pipe, program_pipe):
+ """Helper method for collecting stdio output. Output is collected until
+ a newline is seen, at which point an event is triggered and the line is
+ pushed to a buffer as a (stdio, line) tuple."""
+ buffer = cStringIO.StringIO()
+ pipe_running = True
+ while pipe_running:
+ char = program_pipe.read(1)
+ if not char and self.shard.poll() is not None:
+ pipe_running = False
+ self.line_ready_event.set()
+ buffer.write(char)
+ if char == '\n' or not pipe_running:
+ line = buffer.getvalue()
+ if not line and not pipe_running:
+ with self.lock:
+ self.completed += 1
+ self.line_ready_event.set()
+ break
+ self.queue.put((system_pipe, line))
+ self.line_ready_event.set()
+ buffer.close()
+ buffer = cStringIO.StringIO()
+
+ def handle_pipe(self, system_pipe, program_pipe):
+ t = threading.Thread(target=self._pipe_handler, args=[system_pipe,
+ program_pipe])
+ t.start()
+ return t
+
+ def readline(self):
+ """Emits a tuple of (sys.stderr, line) or (sys.stdout, line). This is a
+ blocking call."""
+ while self.completed < 2 and self.queue.empty():
+ self.line_ready_event.wait()
+ self.line_ready_event.clear()
+ if not self.queue.empty():
+ return self.queue.get_nowait()
+ else:
+ return (None, None) \ No newline at end of file