Source code for obci.drivers.generic.py_amplifier

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from obci.mx_legacy.multiplexer_constants import peers, types
from obci.configs import variables_pb2
from obci.control.peer.configured_client import ConfiguredClient
from obci.utils.openbci_logging import log_crash


[docs]class PyAmplifier(ConfiguredClient): @log_crash def __init__(self, addresses, peer_type=peers.AMPLIFIER): super(PyAmplifier, self).__init__(addresses=addresses, type=peer_type) self._init() def _init(self): self._manage_params() self.ready() def _create_msg(self, samples): assert(self.samples_per_packet == len(samples)) v = variables_pb2.SampleVector() for sample, ts in samples: s = v.samples.add() s.channels.extend(sample) s.timestamp = ts return v def _create_mx_msg(self, msg): return msg.SerializeToString() def _send(self, msg): self.conn.send_message(message=msg, type=self.mx_signal_type, flush=True) def _manage_params(self): self.samples_per_packet = int(self.get_param("samples_per_packet")) self.mx_signal_type = types.__dict__[self.config.get_param("mx_signal_type")]