#!/usr/bin/python
# -*- coding: utf-8 -*-
import threading
import time
import socket
import zmq
from obci.control.common.message import OBCIMessageTool, PollingObject, send_msg
from obci.control.launcher.launcher_messages import message_templates
import obci.control.common.net_tools as net
from obci.utils.openbci_logging import get_logger
UNKNOWN = 'unknown'
RUNNING = 'running'
FAILED = 'failed'
FINISHED = 'finished'
TERMINATED = 'terminated'
NON_RESPONSIVE = 'non_responsive'
PROCESS_STATUS = [UNKNOWN, RUNNING, FAILED, FINISHED, TERMINATED, NON_RESPONSIVE]
PING = 2
RETURNCODE = 4
MONITORING_OPTIONS = [PING, RETURNCODE]
REG_TIMER = 0
[docs]class Process(object):
def __init__(self, proc_description,
reg_timeout_desc=None,
monitoring_optflags=PING,
logger=None):
self.desc = proc_description
self.must_register = reg_timeout_desc is not None
self._status_lock = threading.RLock()
self._status = UNKNOWN if self.must_register else RUNNING
self._status_details = None
self.ping_it = monitoring_optflags & PING
self.check_returncode = monitoring_optflags & RETURNCODE if \
self.desc.pid is not None else False
self.logger = logger or get_logger(
'subprocess_monitor' + '-' + self.desc.name + '-' + str(self.desc.pid),
stream_level='info')
self.set_registration_timeout_handler(reg_timeout_desc)
self.registration_data = None
self._stop_monitoring = False
self._ping_thread = None
self._ping_retries = 8
self._returncode_thread = None
self._mtool = OBCIMessageTool(message_templates)
self._ctx = None
self.rq_sock = None
self._poller = PollingObject()
self.delete = False
@property
def machine_ip(self):
return self.desc.machine_ip
@property
def pid(self):
return self.desc.pid
@property
def path(self):
return self.desc.path
@property
def proc_type(self):
return self.desc.proc_type
@property
def name(self):
return self.desc.name
[docs] def status(self):
with self._status_lock:
return self._status, self._status_details
[docs] def set_registration_timeout_handler(self, reg_timeout_desc):
with self._status_lock:
self._status = UNKNOWN
self._status_details = None
self.must_register = reg_timeout_desc is not None
self.reg_timeout_desc = reg_timeout_desc
self.reg_timer = None if not self.must_register else \
self.new_timer(self.reg_timeout_desc, REG_TIMER)
if self.must_register:
self.reg_timer.start()
[docs] def is_local(self):
raise NotImplementedError()
[docs] def timeout_handler(self, custom_method, args, type_):
self._do_handle_timeout(type_)
custom_method(*args)
def _do_handle_timeout(self, type_):
raise NotImplementedError()
[docs] def new_timer(self, tim_desc, type_):
return threading.Timer(tim_desc.timeout, self.timeout_handler,
[tim_desc.timeout_method, tim_desc.timeout_args, type_])
[docs] def registered(self, reg_data):
if self.reg_timer is not None:
self.reg_timer.cancel()
self.logger.info("{0} [{1}] REGISTERED!!! {2}".format(
self.name, self.proc_type, reg_data.machine_ip))
# print "ping:", self.ping_it, "ret:", self.check_returncode
with self._status_lock:
self._status = RUNNING
# TODO validate registration data
self.registration_data = reg_data
self.logger.info("reg_data" + str(vars(reg_data)))
if self.ping_it:
if not self._ctx:
self._ctx = zmq.Context()
self.rq_sock = self._ctx.socket(zmq.REQ)
for addr in reg_data.rep_addrs:
if reg_data.machine_ip != socket.gethostname() and\
net.addr_is_local(addr):
continue
self.logger.debug(self.name + "connecting to " + addr)
self.rq_sock.connect(addr)
[docs] def stop_monitoring(self):
if self.reg_timer:
self.reg_timer.cancel()
self.reg_timer = None
self._stop_monitoring = True
if self._ping_thread is not None:
self.logger.info("%s, %s, %s",
self.proc_type, self.name, "Joining ping thread")
self._ping_thread.join()
if self._returncode_thread is not None:
self.logger.info("%s %s %s",
self.proc_type, self.name, "joining returncode thread")
self._returncode_thread.join()
self.logger.info("monitor for: %s, %s, %s",
self.proc_type, self.name, " ...monitoring threads stopped.")
[docs] def finished(self):
finished = True
if self._ping_thread is not None:
finished = not self._ping_thread.is_alive()
if self._returncode_thread is not None:
finished = finished and not self._returncode_thread.is_alive()
return finished
[docs] def process_is_running(self):
running = True
if self._ping_thread is not None:
running = self._ping_thread.is_alive()
if self._returncode_thread is not None:
running = running and self._returncode_thread.is_alive()
return running
[docs] def start_monitoring(self):
if self.ping_it:
self._ping_thread = threading.Thread(target=self.ping_monitor, args=())
self._ping_thread.daemon = True
self._ping_thread.start()
if self.check_returncode:
self._returncode_thread = threading.Thread(target=self.returncode_monitor, args=())
self._returncode_thread.daemon = True
self._returncode_thread.start()
[docs] def ping_monitor(self):
is_alive = True
try:
while not self._stop_monitoring and is_alive:
time.sleep(2)
if self.rq_sock is not None:
send_msg(self.rq_sock, self._mtool.fill_msg('ping'))
result = None
while self._ping_retries and not result and not self._stop_monitoring:
result, det = self._poller.poll_recv(socket=self.rq_sock, timeout=1500)
if not result and not self._stop_monitoring:
self.logger.info("%s %s %s",
self.proc_type, self.name, "NO RESPONSE TO PING!")
with self._status_lock:
if self._status not in [FAILED, FINISHED]:
self._status = NON_RESPONSIVE
self._status_details = 'ping response timeout'
print("status:", self._status)
is_alive = False
finally:
if self.rq_sock is not None:
self.rq_sock.close(linger=0)
[docs] def returncode_monitor(self):
raise NotImplementedError()
[docs] def kill(self):
raise NotImplementedError()
[docs] def mark_delete(self):
with self._status_lock:
self.delete = True
[docs] def marked_delete(self):
with self._status_lock:
return self.delete