Source code for obci.drivers.generic.py_amplifier_soft
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import sys
from obci.drivers.generic import py_amplifier
from obci.utils.openbci_logging import log_crash
from obci.utils import streaming_debug
[docs]class PyAmplifierSoft(py_amplifier.PyAmplifier):
def _manage_params(self):
super(PyAmplifierSoft, self)._manage_params()
self._next_msg = None
self._prev_ts = 0.0
self._first_sample_ts = 0.0
self.sleep_time = (1.0 / float(self.get_param('sampling_rate'))) * self.samples_per_packet
self.debug = streaming_debug.Debug(int(self.config.get_param('sampling_rate')),
self.logger,
self.samples_per_packet)
def _get_msg(self):
t = time.time()
samples = []
for i in range(self.samples_per_packet):
s, ts = self._get_sample()
if s is None:
return s
if ts is None:
ts = t + i * (self.sleep_time / self.samples_per_packet)
samples.append((s, ts))
return self._create_msg(samples)
def _get_mx_msg(self, msg=None):
if msg is None:
msg = self._get_msg()
if msg is None:
return None
return self._create_mx_msg(msg)
@log_crash
[docs] def do_sampling(self):
msg = self._get_msg()
self._first_sample_ts = msg.samples[0].timestamp
self._next_mx_msg = self._get_mx_msg(msg)
while True:
self._prev_ts = time.time()
self._send(self._next_mx_msg)
self._next_mx_msg = self._get_mx_msg()
if self._next_mx_msg is None:
self.logger.info("No more samples. Abort amplifier...")
break
self._post_send()
self.debug.next_sample()
while (time.time() - self._prev_ts < self.sleep_time):
time.sleep(0.001)
sys.exit(0)
def _get_sample(self):
raise Exception("To be subclassed")
def _post_send(self):
pass