Source code for obci.drivers.eeg.binary_driver_wrapper

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys

from obci.control.peer.configured_multiplexer_server import ConfiguredMultiplexerServer
from obci.drivers.eeg.driver_comm import DriverComm
from obci.utils import context as ctx
from obci.utils.openbci_logging import log_crash

import json

SEP = ';'


[docs]class BinaryDriverWrapper(ConfiguredMultiplexerServer, DriverComm): """A wrapper around c++ amplifier binaries with INI configuration support. """ desc_params = dict(amplifier_name='name', physical_channels_no='physical_channels', sampling_rates='sampling_rates', channels_info='channels') @log_crash def __init__(self, addresses, type): """Do: 1) run super constructor to receive configs 2) run DriverComm constructor that fires binary driver 3) get json description from the driver (desc_params are required) 4) store that description in self.configs to share it with other modules 5) if autostart is set to true: 6) set driver params from config (sampling_rate and active_channels) 7) start sampling """ super(BinaryDriverWrapper, self).__init__(addresses=addresses, type=type) self._init_got_configs() self._mx_addresses = addresses context = ctx.get_new_context() context['logger'] = self.logger DriverComm.__init__(self, self.config, addresses, context) desc = self.get_driver_description() if desc.startswith('DEVICE OPEN ERROR'): self.abort("DEVICE PROBLEM: " + desc + " ...ABORTING!!!") self.store_driver_description(desc) autostart = self.config.true_val(self.config.get_param('start_sampling')) self.logger.info('Automatic start' + str(autostart)) self.ready() if autostart: self.set_driver_params() self.start_sampling() def _init_got_configs(self): pass
[docs] def store_driver_description(self, driver_output): if len(driver_output) < 500: self.logger.info("This does not look good: " + driver_output) amp_desc = json.loads(driver_output) for par, desc_par in self.desc_params.items(): self.config.set_param(par, amp_desc[desc_par]) self._extract_active_channels_info(amp_desc['channels'])
def _extract_active_channels_info(self, channels_list): active = self.get_param('active_channels').split(SEP) names = self.get_param('channel_names').split(SEP) gains = [] offsets = [] if len(active) != len(names): self.logger.error("Active channels list length different than channel names length!") self.stop_sampling() sys.exit(1) for chan in active: info = self._find_channel_info(chan, channels_list) gains.append(info['gain']) offsets.append(info['offset']) gains_str = SEP.join([str(g) for g in gains]) offset_str = SEP.join([str(o) for o in offsets]) self.set_param('channel_gains', gains_str) self.set_param('channel_offsets', offset_str) self.logger.info('Set active channels: ' + str(active) + ', channel_gains: ' + gains_str + ', channel_offsets: ' + offset_str) def _find_channel_info(self, chan, channels_list): match = [info for index, info in enumerate(channels_list) if info['name'] == chan or str(index) == chan] if not match: self.logger.error('Invalid channel name ' + chan) self.stop_sampling() sys.exit(1) elif len(match) > 1: self.logger.error('Ambiguous channel name ' + chan + str(channels_list)) self.stop_sampling() sys.exit(1) return match.pop()
[docs] def abort(self, error_msg): self.set_param('error_details', error_msg) DriverComm.abort(self, error_msg)
[docs] def handle_message(self, mxmsg): # handle something self.no_response()
[docs] def validate_params(self, params, amp_params_received=False): if amp_params_received: for par in params: if params[par] == '': self.logger.error('Parameter ' + par + 'is empty!!! ABORTING....') sys.exit(1)