# Copyright (c) 2012 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Syncronized Standard IO Linebuffer implemented with cStringIO.""" import cStringIO import os import sys import threading import Queue class StdioBuffer(object): def __init__(self, shard): self.line_ready_event = threading.Event() self.queue = Queue.Queue() self.lock = threading.Lock() self.completed = 0 self.shard = shard def _pipe_handler(self, system_pipe, program_pipe): """Helper method for collecting stdio output. Output is collected until a newline is seen, at which point an event is triggered and the line is pushed to a buffer as a (stdio, line) tuple.""" buffer = cStringIO.StringIO() pipe_running = True while pipe_running: char = program_pipe.read(1) if not char and self.shard.poll() is not None: pipe_running = False self.line_ready_event.set() buffer.write(char) if char == '\n' or not pipe_running: line = buffer.getvalue() if not line and not pipe_running: with self.lock: self.completed += 1 self.line_ready_event.set() break self.queue.put((system_pipe, line)) self.line_ready_event.set() buffer.close() buffer = cStringIO.StringIO() def handle_pipe(self, system_pipe, program_pipe): t = threading.Thread(target=self._pipe_handler, args=[system_pipe, program_pipe]) t.start() return t def readline(self): """Emits a tuple of (sys.stderr, line) or (sys.stdout, line). This is a blocking call.""" while self.completed < 2 and self.queue.empty(): self.line_ready_event.wait() self.line_ready_event.clear() if not self.queue.empty(): return self.queue.get_nowait() else: return (None, None)