Source code for obci.drivers.eeg.driver_discovery.amplifier_tmsi_usb_discovery
#!/usr/bin/python
# -*- coding: utf-8 -*-
import json
import os
from obci.control.launcher.launcher_tools import obci_root, READY_TO_LAUNCH
from obci.control.peer.peer_config import PeerConfig
from obci.drivers.eeg.driver_comm import DriverComm
_DESC_BASE_PATH = os.path.join(obci_root(), 'drivers/eeg/driver_discovery')
_TYPES = ['Porti7']
_USB_DESCS = {'Porti7': 'amplifier_porti7_usb.json',
'SynFi': 'amplifier_tmsi_synfi.json'}
_AMP_PEER = 'drivers/eeg/cpp_amplifiers/amplifier_tmsi.py'
_AMP_EXECUTABLE = 'tmsi_amplifier'
_SCENARIO = 'scenarios/amplifier/tmsi_amp_signal.ini'
def _find_usb_amps():
dev_path = os.path.realpath('/dev')
amps = [dev for dev in os.listdir(dev_path) if
dev.startswith('fusbi')]
synfi_amps = [dev for dev in os.listdir(dev_path) if
dev.startswith('synfi')]
amps = [os.path.join(dev_path, dev) for dev in amps]
synfi_amps = [os.path.join(dev_path, dev) for dev in synfi_amps]
amps = [(dev, '', 'Porti7') for dev in amps if os.readlink(dev).startswith('tmsi')]
synfi_amps = [(dev, '', 'SynFi') for dev in synfi_amps if os.readlink(dev).startswith('tmsi')]
amps += synfi_amps
print("Found USB devices: ", amps)
return amps
[docs]def get_description_from_driver(device_path):
conf = PeerConfig('amplifier')
conf.add_local_param('driver_executable', _AMP_EXECUTABLE)
conf.add_local_param('samples_per_packet', '4')
conf.add_local_param('bluetooth_device', '')
conf.add_local_param('usb_device', device_path)
driv = DriverComm(conf, catch_signals=False)
descr = driv.get_driver_description()
try:
dic = json.loads(descr)
except ValueError as e:
print("AMPLIFIER ", device_path, "IS PROBABLY BUSY.", end=' ')
print("Invalid channel description: ", descr)
dic = None
driv.terminate_driver()
return dic
[docs]def driver_descriptions():
descriptions = []
usb = _find_usb_amps()
for amp in usb:
desc = {
'experiment_info': {
"launch_file_path": _SCENARIO,
'experiment_status': {
'status_name': READY_TO_LAUNCH
}
},
'amplifier_peer_info': {
'driver_executable': _AMP_EXECUTABLE,
'path': _AMP_PEER},
'amplifier_params': {
'additional_params': {'usb_device': '',
'bluetooth_device': ''},
'active_channels': '',
'channel_names': '',
'sampling_rate': ''},
}
desc['amplifier_params']['additional_params']['usb_device'] = amp[0]
# with open(os.path.join(_DESC_BASE_PATH, _USB_DESCS[amp[2]])) as f:
# desc['amplifier_params']['channels_info'] = json.load(f)
chan_inf = get_description_from_driver(amp[0])
if chan_inf is None:
# amplifier busy or an error occurred
continue
else:
desc['amplifier_params']['channels_info'] = chan_inf
descriptions.append(desc)
return descriptions