diff options
author | phajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-11-20 20:23:45 +0000 |
---|---|---|
committer | phajdan.jr@chromium.org <phajdan.jr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-11-20 20:23:45 +0000 |
commit | 9af78cb2030b492a2ab5b766b3f803f7c6964a30 (patch) | |
tree | b5380b6a3e9d6c6a44db0c3364af275782a3ca9c /tools/sharding_supervisor/stdio_buffer.py | |
parent | a7e2e82121a8b07a685638f83dacd9e7c1d1602a (diff) | |
download | chromium_src-9af78cb2030b492a2ab5b766b3f803f7c6964a30.zip chromium_src-9af78cb2030b492a2ab5b766b3f803f7c6964a30.tar.gz chromium_src-9af78cb2030b492a2ab5b766b3f803f7c6964a30.tar.bz2 |
Revert "GTTF: remove parts of sharding_supervisor that might be hang-prone."
This reverts commit 2aa17efbf65fda1aadc713376ecc4ea6239034d3.
Revert "Decrease level of parallelism in sharding supervisor,"
This reverts commit f6aa2a06d514ec8c8777c541bcd5c69c71785fc3.
Revert "Remove XML logic from sharding supervisor. It's broken:"
This reverts commit 0c9c0eb95f30e5a2fb4b19b87a89d4e23224cfe5.
BUG=
Review URL: https://codereview.chromium.org/11411103
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@168856 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools/sharding_supervisor/stdio_buffer.py')
-rw-r--r-- | tools/sharding_supervisor/stdio_buffer.py | 60 |
1 files changed, 60 insertions, 0 deletions
diff --git a/tools/sharding_supervisor/stdio_buffer.py b/tools/sharding_supervisor/stdio_buffer.py new file mode 100644 index 0000000..737374d --- /dev/null +++ b/tools/sharding_supervisor/stdio_buffer.py @@ -0,0 +1,60 @@ +# Copyright (c) 2012 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +"""Syncronized Standard IO Linebuffer implemented with cStringIO.""" + +import cStringIO +import os +import sys +import threading +import Queue + + +class StdioBuffer(object): + def __init__(self, shard): + self.line_ready_event = threading.Event() + self.queue = Queue.Queue() + self.lock = threading.Lock() + self.completed = 0 + self.shard = shard + + def _pipe_handler(self, system_pipe, program_pipe): + """Helper method for collecting stdio output. Output is collected until + a newline is seen, at which point an event is triggered and the line is + pushed to a buffer as a (stdio, line) tuple.""" + buffer = cStringIO.StringIO() + pipe_running = True + while pipe_running: + char = program_pipe.read(1) + if not char and self.shard.poll() is not None: + pipe_running = False + self.line_ready_event.set() + buffer.write(char) + if char == '\n' or not pipe_running: + line = buffer.getvalue() + if not line and not pipe_running: + with self.lock: + self.completed += 1 + self.line_ready_event.set() + break + self.queue.put((system_pipe, line)) + self.line_ready_event.set() + buffer.close() + buffer = cStringIO.StringIO() + + def handle_pipe(self, system_pipe, program_pipe): + t = threading.Thread(target=self._pipe_handler, args=[system_pipe, + program_pipe]) + t.start() + return t + + def readline(self): + """Emits a tuple of (sys.stderr, line) or (sys.stdout, line). This is a + blocking call.""" + while self.completed < 2 and self.queue.empty(): + self.line_ready_event.wait() + self.line_ready_event.clear() + if not self.queue.empty(): + return self.queue.get_nowait() + else: + return (None, None)
\ No newline at end of file |