#!/usr/bin/python3
import sys
import os.path
import subprocess
import signal
import time
import socket
from subprocess import Popen
from threading import Thread
from obci.utils import context as ctx
try:
from queue import Queue, Empty
except ImportError:
from queue import Queue, Empty # python 3.x
SEP = ';'
[docs]class DriverComm(object):
""" Start, stop and communicate with amplifier driver binaries.
Note: To run amplifier as OBCI experiment peer, use subclasses of BinaryDriverWrapper
which fully support INI file configuration.
Example:
>>> from obci.control.peer.peer_config import PeerConfig
>>> import json
>>> conf = PeerConfig('amplifier')
>>> conf.add_local_param('driver_executable', 'dummy_amplifier')
>>> conf.add_local_param('samples_per_packet', '4')
>>> driv = DriverComm(conf)
dummy_amplifier
>>> descr = driv.get_driver_description() # channels_info
>>> dic = json.loads(descr)
>>> driv.start_sampling()
start OK
<BLANKLINE>
>>> time.sleep(3)
>>> driv.terminate_driver()
"""
def __init__(self, peer_config, mx_addresses=[('localhost', 41921)], catch_signals=True,
context=ctx.get_dummy_context('DriverComm')):
""" *peer_config* - parameter provider. Should respond to get_param(param_name, value)
and has_param(param_name) calls. PeerConfig and PeerControl objects are suitable.
*mx_addresses* - list of (host, port) pairs. Port value None means using default
amplifier binary value.
"""
self.config = peer_config
if not hasattr(self, "_mx_addresses"):
# FIXME
self._mx_addresses = mx_addresses
if not hasattr(self, "logger"):
self.logger = context['logger']
self.driver = self.run_driver(self.get_run_args((socket.gethostbyname(
self._mx_addresses[0][0]), self._mx_addresses[0][1])))
self.driver_out_q = Queue()
self.driver_out_thr = Thread(target=enqueue_output,
args=(self.driver.stdout, self.driver_out_q))
self.driver_out_thr.daemon = True # thread dies with the program
self.driver_out_thr.start()
self.driver_err_q = Queue()
self.driver_err_thr = Thread(target=enqueue_output,
args=(self.driver.stderr, self.driver_err_q))
self.driver_err_thr.daemon = True # thread dies with the program
self.driver_err_thr.start()
if catch_signals:
signal.signal(signal.SIGTERM, self.signal_handler())
signal.signal(signal.SIGINT, self.signal_handler())
[docs] def signal_handler(self):
def handler(signum, frame):
self.logger.info("Got signal " + str(signum) + "!!! Stopping driver!")
self.stop_sampling()
self.terminate_driver()
self.logger.info("Exit!")
sys.exit(-signum)
return handler
[docs] def run_driver(self, run_args):
self.logger.info("Executing: " + ' '.join(run_args))
return Popen(run_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
[docs] def get_driver_description(self):
return self._communicate()
[docs] def set_driver_params(self):
self.set_sampling_rate(self.config.get_param("sampling_rate"))
self.set_active_channels(self.config.get_param("active_channels"))
[docs] def get_run_args(self, multiplexer_address):
host, port = multiplexer_address
exe = self.config.get_param('driver_executable')
print(exe)
args = [exe, '-v', self.config.get_param('samples_per_packet')]
if port:
args += ["-h", str(host), '-p', str(port)]
if self.config.has_param("usb_device") and self.config.has_param("bluetooth_device"):
usb = self.config.get_param("usb_device")
if usb:
args.extend(["-d", usb])
elif self.config.get_param("bluetooth_device"):
args.extend(["-b", self.config.get_param("bluetooth_device")])
else:
raise Exception("usb_device or bluetooth_device is required")
if self.config.has_param("amplifier_responses"):
if self.config.get_param("amplifier_responses"):
args.extend(["-r", self.config.get_param("amplifier_responses")])
if self.config.has_param("dump_responses"):
if self.config.get_param("dump_responses"):
args.extend(["--save_responses", self.config.get_param("dump_responses")])
# FIXME FIXME
if "gtec" in exe:
args.extend(["-d", os.path.join(os.path.dirname(exe), "simple_gtec_driver")])
return args
[docs] def set_sampling_rate(self, sampling_rate):
self.logger.info("Set sampling rate: %s " % sampling_rate)
error = self._communicate("sampling_rate " + str(sampling_rate),
timeout_s=2, timeout_error=False)
if error:
print(error)
[docs] def set_active_channels(self, active_channels):
self.logger.info("Set Active channels: %s" % active_channels)
error = self._communicate("active_channels " + str(active_channels),
timeout_s=2, timeout_error=False)
if error:
print(error)
[docs] def start_sampling(self):
signal.signal(signal.SIGINT, self.stop_sampling)
self.logger.info("Start sampling")
error = self._communicate("start", timeout_s=0, timeout_error=False)
if error:
print(error)
self.logger.info("Sampling started")
[docs] def stop_sampling(self, _1=None, _2=None):
# sys.stderr.write("stop sampling")
self.logger.info("Stop sampling")
signal.signal(signal.SIGINT, signal.SIG_DFL)
self.driver.send_signal(signal.SIGINT)
self.logger.info("Sampling stopped")
[docs] def terminate_driver(self):
self.driver.send_signal(signal.SIGTERM)
self.driver.wait()
self.logger.info("Terminated driver.")
[docs] def abort(self, error_msg):
time.sleep(2)
self.logger.error(error_msg)
if self.driver_is_running():
self.logger.info("Stopping driver.")
self.stop_sampling()
if self.driver_is_running():
# TODO does 'stop_sampling' always terminate the driver process???
self.logger.info("driver still not dead")
self.terminate_driver()
sys.exit(1)
[docs] def driver_is_running(self):
self.driver.poll()
return self.driver.returncode is None
def _communicate(self, command="", timeout_s=7, timeout_error=True):
if not self.driver_is_running():
self.logger.error("Driver is not running!!!!!!!")
sys.exit(self.driver.returncode)
get_timeout = .1
count = 0
out = ""
self.driver.stdin.write(command.encode() + b'\n')
self.driver.stdin.flush()
while timeout_s - get_timeout * count >= 0:
line = None
try:
line = self.driver_out_q.get(timeout=get_timeout).decode() # or self.driver_out_q.get_nowait()
except Empty:
count += 1
time.sleep(get_timeout)
pass
if line is None:
if self.driver_is_running():
continue
else:
raise Exception("Driver is dead. ABORTING!!!")
elif len(line) == 0:
self.abort("Got empty string from driver. ABORTING...!!!")
elif line == "\n":
break
else:
count = 0
out += line
if out == "" and timeout_error:
self.abort("Communication with driver unsuccesful, \
timeout " + str(timeout_s) + "s passed. ABORTING!!!")
return out
[docs] def do_samplingg(self):
self.driver.wait()
self.logger.info("Driver finished working with code " + str(self.driver.returncode))
sys.exit(self.driver.returncode)
[docs] def do_sampling(self):
self.logger.info("Stat waiting on drivers output....")
while True: # read and log data from sterr and stoout of the driver...
try:
v = self.driver_err_q.get_nowait()
v = v.decode().rstrip() # amplifiers send binary and like to add \n at the end
self.logger.info(v)
except Empty:
pass
try:
v = self.driver_out_q.get_nowait()
v = v.decode().rstrip() # amplifiers send binary and like to add \n at the end
self.logger.info(v)
except Empty:
time.sleep(0.1)
sys.exit(self.driver.returncode)
[docs]def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
if __name__ == "__main__":
from obci.control.peer.peer_config import PeerConfig
import json
conf = PeerConfig('amplifier')
conf.add_local_param('driver_executable', 'tmsi_amplifier')
conf.add_local_param('samples_per_packet', '4')
conf.add_local_param('bluetooth_device', '')
conf.add_local_param('usb_device', '/dev/tmsi0')
driv = DriverComm(conf)
descr = driv.get_driver_description()
dic = json.loads(descr)
driv.start_sampling()
time.sleep(3)
driv.terminate_driver()