Source code for obci.control.launcher.process_io_handler

#!/usr/bin/python
# -*- coding: utf-8 -*-

import threading
import time
import subprocess

from collections import deque
try:
    from queue import Queue, Empty, Full
except ImportError:
    from queue import Queue, Empty, Full  # python 3.x


STDIO_QUEUE_MAX_SIZE = 8192
STDIO_TAIL_LEN = 128
DEFAULT_TAIL_RQ = 10

LINES_TO_GET = 5
IO_WAIT = 0.5


[docs]def start_stdio_handler(popen_obj, stdio_actions, name, stdout_log, stderr_log): io_handler = None if stdio_actions != (None, None, None): out, err, stdin = stdio_actions out_handle = popen_obj.stdout if out is not None else None if err == subprocess.STDOUT or err is None: err_handle = None else: err_handle = popen_obj.stderr in_handle = popen_obj.stdin if stdin is not None else None io_handler = ProcessIOHandler( name=name, stdout=out_handle, stderr=err_handle, stdin=in_handle, out_log=stdout_log, err_log=stderr_log) io_handler.start_output_handler() return io_handler
[docs]class ProcessIOHandler(object): """Processes data from descriptors (stdout, stderr) in separate threads. Access to the tail is available through attributes *out_tail* and *err_tail*. Communication through stdin, if given, is possible by *communicate()* method. Data is saved to log files if the handles were given in init. """ def __init__(self, name, stdout=None, stderr=None, stdin=None, out_log=None, err_log=None): self.name = name self.stdout = stdout self.stdin = stdin self.stderr = stderr self._output_handler_thread = None self._stop = False self._out_q, self._stdout_thread, self._out_log = \ self.__init_io(self.stdout, out_log) self.out_tail = deque(maxlen=STDIO_TAIL_LEN) self.err_tail = deque(maxlen=STDIO_TAIL_LEN) self._err_q, self._stderr_thread, self._err_log = \ self.__init_io(self.stderr, err_log) if self.stdout or self.stderr: self._start_background_io_reading() def __init_io(self, stream, log_name): q, thr, log = None, None, None if stream is not None: q = Queue(maxsize=STDIO_QUEUE_MAX_SIZE) thr = threading.Thread(target=self._read, args=(stream, q)) thr.daemon = True if log_name: try: log = open(log_name, 'w', buffering=0) except IOError: print("{0} : Could not open log {1}".format(self.name, log_name)) return q, thr, log
[docs] def communicate(self, input, response_timeout=None): # TODO :) return None
[docs] def tail_stdout(self, lines): data = [] for l in range(lines): try: data.append(self.out_tail.pop()) except IndexError: break return list(reversed(data))
[docs] def process_output(self, lines=None, timeout=None): """Check if there is data from stdout and stderr (if it is monitored). Update tail and save data to logs, if they were given in init. Timeout (s) means blocking queue reads, no timeout - non blocking. """ if self.stdout: self._handle_stdout(lines=lines, timeout=timeout) if self.stderr: self._handle_stderr(lines=lines, timeout=timeout)
[docs] def start_output_handler(self): self._output_handler_thread = threading.Thread( target=self._output_handler) self._output_handler_thread.deamon = True self._output_handler_thread.start()
[docs] def stop_output_handler(self): self._stop = True self._output_handler_thread.join(timeout=0.1) if self.stdout: self._stdout_thread.join(timeout=0.1) if self.stderr: self._stderr_thread.join(timeout=0.1) return self.finished()
[docs] def is_running(self): return self._stop is False and self.__io_readers_alive()
[docs] def finished(self): return (not self.__io_readers_alive()) and \ (not self._output_handler_thread.is_alive())
def __io_readers_alive(self): alive = False if self.stdout: alive = self._stdout_thread.is_alive() if self.stderr: alive = alive or self._stderr_thread.is_alive() return alive def _output_handler(self): while self.__io_readers_alive() and not self._stop: self.process_output(lines=LINES_TO_GET, timeout=IO_WAIT) time.sleep(0.1) # IO_WAIT) def _start_background_io_reading(self): if self.stdout: self._stdout_thread.start() if self.stderr: self._stderr_thread.start() def _read(self, stream, queue): print("reading... ", stream) for line in iter(stream.readline, ''): try: queue.put(line) if self._stop: break except Full: # drop it :/ print("Queue full for stream {0} of {1}".format(stream, self.name)) print("Dropping line.") stream.close() def _get_lines(self, stream, q, lines=None, timeout=None): lines = lines if lines else 1 out = [] for num in range(lines): try: if timeout: line = q.get(block=True, timeout=timeout) else: line = q.get(block=False) except Empty: return out else: # got line out.append(line) return out def _handle_stdout(self, lines=None, timeout=None): self._handle_stdio(self.stdout, self._out_q, self._out_log, self.out_tail, lines, timeout) def _handle_stdio(self, stream, q, log, tail, lines=None, timeout=None): out = self._get_lines(stream, q, lines, timeout) tail.extend(out) if log is not None: try: log.writelines(out) except Exception as e: print(e, e.args) def _handle_stderr(self, lines=None, timeout=None): self._handle_stdio(self.stderr, self._err_q, self._err_log, self.err_tail, lines, timeout)