#!/usr/bin/python3
# -*- coding: utf-8 -*-
import threading
import subprocess
import sys
import os
import socket
import zmq
from obci.control.common.message import OBCIMessageTool, PollingObject, send_msg
from obci.control.launcher.launcher_messages import message_templates
from obci.utils.openbci_logging import get_logger
from obci.control.launcher.process_io_handler import start_stdio_handler
from obci.control.launcher.local_process import LocalProcess
from obci.control.launcher.remote_process import RemoteProcess
from obci.control.launcher.process import (FAILED, FINISHED, TERMINATED, UNKNOWN,
PING, RETURNCODE)
NO_STDIO = 0
STDOUT = 1
STDERR = 2
STDIN = 4
STDIO = [NO_STDIO, STDOUT, STDERR, STDIN]
PYTHON_CALL = ['obci_run_peer']
REGISTER_TIMEOUT = 3
_DEFAULT_TIMEOUT = 20
_DEFAULT_TIMEOUT_MS = 6000
[docs]class SubprocessMonitor(object):
def __init__(self, zmq_ctx, uuid, logger=None, obci_dns=None):
self._processes = {}
self._ctx = zmq_ctx
self.uuid = uuid
self.logger = logger or get_logger('subprocess_monitor',
stream_level='warning')
self.obci_dns = obci_dns
self._mtool = OBCIMessageTool(message_templates)
self.poller = PollingObject()
self._proc_lock = threading.RLock()
[docs] def not_running_processes(self):
status = {}
with self._proc_lock:
for key, proc in self._processes.items():
st = proc.status()
if st[0] in [FINISHED, FAILED, TERMINATED] and not proc.marked_delete():
status[key] = st
return status
[docs] def unknown_status_processes(self):
with self._proc_lock:
return [proc for proc in self._processes.values()
if proc.status()[0] == UNKNOWN]
[docs] def process(self, machine_ip, pid):
with self._proc_lock:
return self._processes.get((machine_ip, pid), None)
[docs] def killall(self, force=False):
with self._proc_lock:
for proc in self._processes.values():
kill_method = proc.kill if not force else proc.kill_with_force
if proc.status()[0] not in [FINISHED, FAILED, TERMINATED]:
kill_method()
[docs] def delete(self, machine, pid):
proc = self._processes.get((machine, pid), None)
if proc is None:
raise Exception("Process not found: " + str((machine, pid)))
if not proc.running():
del self._processes[(machine, pid)]
return True
else:
self.logger.error("Process is running, will not delete! " + str((machine, pid)))
return False
[docs] def delete_all(self):
with self._proc_lock:
for proc in list(self._processes.values()):
del proc
self._processes = {}
[docs] def stop_monitoring(self):
with self._proc_lock:
for proc in self._processes.values():
proc.stop_monitoring()
def _launch_args(self, path, args):
# TODO fix interpreter calls // only python is supported
if path.endswith('.py'):
launch_args = PYTHON_CALL + [path] + args
else:
launch_args = [path] + args
return launch_args
def _stdio_actions(self, io_flags):
out = subprocess.PIPE if io_flags & STDOUT else None
if io_flags & STDERR:
err = subprocess.PIPE
elif out is not None:
err = subprocess.STDOUT
else:
err = None
stdin = subprocess.PIPE if io_flags & STDIN else None
return (out, err, stdin)
def _local_launch(self, launch_args, stdio_actions, env):
ON_POSIX = 'posix' in sys.builtin_module_names
out, err, stdin = stdio_actions
try:
if sys.platform == "win32":
crflags = subprocess.CREATE_NEW_PROCESS_GROUP
popen_obj = subprocess.Popen(launch_args,
stdout=out, stderr=err, stdin=stdin,
bufsize=1, close_fds=ON_POSIX, env=env,
creationflags=crflags)
else:
popen_obj = subprocess.Popen(launch_args,
stdout=out, stderr=err, stdin=stdin,
bufsize=1, close_fds=ON_POSIX, env=env)
details = "Popen constructor finished for " +\
str(launch_args[:3]) + "(...)"
self.logger.info(details)
return popen_obj, details
except OSError as e:
details = "Unable to spawn process {0} [{1}]".format(launch_args, e.args)
self.logger.error(details)
return None, details
except ValueError as e:
details = "Unable to spawn process (bad arguments) {0} [{1}]".format(launch_args, e.args)
self.logger.error(details)
return None, details
except Exception as e:
details = "Process launch Error: " + str(e) + str(e.args) + str(vars(e))
self.logger.error(details)
return None, details
[docs] def new_local_process(self, path, args, proc_type='', name='',
capture_io=STDOUT | STDIN,
stdout_log=None,
stderr_log=None,
register_timeout_desc=None,
monitoring_optflags=RETURNCODE | PING,
machine_ip=None,
env=None):
launch_args = self._launch_args(path, args)
self.logger.debug(proc_type + " local path: " + path)
machine = machine_ip if machine_ip else socket.gethostname()
std_actions = self._stdio_actions(capture_io)
timeout_desc = register_timeout_desc
self.logger.debug('process launch arg list: %s', launch_args)
popen_obj, details = self._local_launch(launch_args, std_actions, env)
if popen_obj is None:
return None, details
if popen_obj.returncode is not None:
det = "opened process already terminated" + popen_obj.communicate()
self.logger.warning(det)
if not name:
name = os.path.basename(path)
process_desc = ProcessDescription(proc_type=proc_type,
name=name,
path=path,
args=args,
machine_ip=machine,
pid=popen_obj.pid)
# io_handler will be None if no stdio is captured
io_handler = start_stdio_handler(popen_obj, std_actions,
':'.join([machine, path, name]),
stdout_log, stderr_log)
new_proc = LocalProcess(process_desc, popen_obj, io_handler=io_handler,
reg_timeout_desc=timeout_desc,
monitoring_optflags=monitoring_optflags,
logger=self.logger)
if monitoring_optflags & PING:
new_proc._ctx = self._ctx
with self._proc_lock:
self._processes[(machine, popen_obj.pid)] = new_proc
new_proc.start_monitoring()
return new_proc, None
[docs] def new_remote_process(self, path, args, proc_type, name,
machine_ip, conn_addr,
capture_io=STDOUT | STDIN,
stdout_log=None,
stderr_log=None,
register_timeout_desc=None,
monitoring_optflags=PING):
"""Send a request to conn_addr for a process launch. By default
the process will be monitored with ping requests and locally by the
remote peer."""
timeout_desc = register_timeout_desc
rq_message = self._mtool.fill_msg('launch_process',
path=path,
args=args, proc_type=proc_type,
name=name,
machine_ip=machine_ip,
capture_io=capture_io,
stdout_log=stdout_log,
stderr_log=stderr_log)
rq_sock = self._ctx.socket(zmq.REQ)
try:
rq_sock.connect(conn_addr)
except zmq.ZMQError as e:
det = "Could not connect to {0}, err: {1}, {2}".format(
conn_addr, e, e.args)
self.logger.error(det)
return None, det
self.logger.info("SENDING LAUNCH REQUEST {0} {1} {2} {3}".format(
machine_ip, _DEFAULT_TIMEOUT_MS, 'ms', conn_addr))
send_msg(rq_sock, rq_message)
result, details = self.poller.poll_recv(rq_sock, _DEFAULT_TIMEOUT_MS)
rq_sock.close()
if not result:
details = details + " [address was: {0}]".format(conn_addr)
self.logger.error(details)
return None, details
else:
result = self._mtool.unpack_msg(result)
if result.type == 'rq_error':
det = "REQUEST FAILED" + str(result.err_code) + ':' + str(result.details)
self.logger.error(det)
return None, det
elif result.type == 'launched_process_info':
self.logger.info("REQUEST SUCCESS %s", result.dict())
process_desc = ProcessDescription(proc_type=result.proc_type,
name=result.name,
path=result.path,
args=args,
machine_ip=result.machine,
pid=result.pid)
new_proc = RemoteProcess(process_desc, conn_addr,
reg_timeout_desc=timeout_desc,
monitoring_optflags=monitoring_optflags,
logger=self.logger)
if monitoring_optflags & PING:
new_proc._ctx = self._ctx
with self._proc_lock:
self._processes[(result.machine, result.pid)] = new_proc
new_proc.start_monitoring()
return new_proc, None
[docs]class ProcessDescription(object):
def __init__(self, proc_type, name, path, args, machine_ip, pid=None):
self.proc_type = proc_type
self.name = name
self.uuid = None
self.path = path
self.args = args
self.machine_ip = machine_ip
self.pid = pid
[docs] def dict(self):
return dict(proc_type=self.proc_type,
name=self.name,
uuid=self.uuid,
path=self.path,
args=self.args,
machine_ip=self.machine_ip,
pid=self.pid)
[docs]class TimeoutDescription(object):
def __init__(self, timeout=REGISTER_TIMEOUT, timeout_method=None,
timeout_args=[]):
self.timeout = timeout
self.timeout_method = timeout_method if timeout_method else \
self.default_timeout_method
self.timeout_args = timeout_args
[docs] def default_timeout_method(self):
return None
[docs] def timer(self):
return threading.Timer(self.timeout, self.timeout_method, self.timeout_args)
[docs]def default_timeout_handler():
return TimeoutDescription()