Source code for obci.acquisition.acquisition_control
#!/usr/bin/python
# -*- coding: utf-8 -*-
from obci.mx_legacy.multiplexer_constants import peers, types
from obci.mx_legacy.clients import BaseMultiplexerServer
from obci.configs import settings, variables_pb2
from obci.utils import debug_helper
from obci.acquisition import acquisition_logging as logger
LOGGER = logger.get_logger("acquisition_control", 'info')
[docs]def finish_saving(mx_addresses=settings.MULTIPLEXER_ADDRESSES, s_types=['eeg']):
ctr = AcquisitionControl(mx_addresses, s_types)
ctr.send_finish_saving()
ctr.loop()
return ctr.result
[docs]def wait_saving_finished(mx_addresses=settings.MULTIPLEXER_ADDRESSES, s_types=['eeg']):
ctr = AcquisitionControl(mx_addresses, s_types)
ctr.loop()
return ctr.result
[docs]class AcquisitionControl(BaseMultiplexerServer):
"""A class for creating a manifest file with metadata."""
def __init__(self, addresses, s_types=['eeg']):
super(AcquisitionControl, self).__init__(addresses=addresses,
type=peers.ACQUISITION_CONTROL)
self._info_result = None
self._data_result = None
self._tags_result = None
self._wii_info_result = None
self._wii_data_result = None
self._wii_tags_result = None
self._wii_type = 'wii' in s_types
self._eeg_type = 'eeg' in s_types
assert(self._wii_type or self._eeg_type)
self.result = None
[docs] def send_finish_saving(self):
self.conn.send_message(message=b'finish',
type=types.ACQUISITION_CONTROL_MESSAGE,
flush=True)
def _all_ready(self):
eeg_ready = not (self._data_result is None or self._info_result is None or self._tags_result is None)
wii_ready = not (
self._wii_data_result is None or self._wii_info_result is None or self._wii_tags_result is None)
if self._wii_type and self._eeg_type:
return eeg_ready and wii_ready
elif self._wii_type:
return wii_ready
elif self._eeg_type:
return eeg_ready
else:
raise Exception("no wii or eeg signal type detecetd")
[docs] def handle_message(self, mxmsg):
v = variables_pb2.VariableVector()
if mxmsg.type == types.SIGNAL_SAVER_FINISHED:
LOGGER.info("got signal_saver_finished")
v.ParseFromString(mxmsg.message)
self._data_result = v
elif mxmsg.type == types.INFO_SAVER_FINISHED:
LOGGER.info("got info_saver_finished")
v.ParseFromString(mxmsg.message)
self._info_result = v
elif mxmsg.type == types.TAG_SAVER_FINISHED:
LOGGER.info("got tag_saver_finished")
v.ParseFromString(mxmsg.message)
self._tags_result = v
elif mxmsg.type == types.WII_BOARD_SIGNAL_SAVER_FINISHED:
LOGGER.info("got wii signal_saver_finished")
v.ParseFromString(mxmsg.message)
self._wii_data_result = v
elif mxmsg.type == types.WII_BOARD_INFO_SAVER_FINISHED:
LOGGER.info("got wii info_saver_finished")
v.ParseFromString(mxmsg.message)
self._wii_info_result = v
elif mxmsg.type == types.WII_BOARD_TAG_SAVER_FINISHED:
LOGGER.info("got wii tag_saver_finished")
v.ParseFromString(mxmsg.message)
self._wii_tags_result = v
else:
LOGGER.warning("Unrecognised message received!!!!")
self.no_response()
if self._all_ready():
self.result = (self._info_result, self._data_result, self._tags_result,
self._wii_info_result, self._wii_data_result, self._wii_tags_result
)
LOGGER.info(''.join(["Stop acquisition_control with result: ", "\n"]))
if self._eeg_type:
LOGGER.info(''.join(["info result:\n",
debug_helper.get_str_variable_vector(self._info_result),
"data result:\n",
debug_helper.get_str_variable_vector(self._data_result),
"tags result:\n",
debug_helper.get_str_variable_vector(self._tags_result)
]))
if self._wii_type:
LOGGER.info(''.join(["\n wii info result:\n",
debug_helper.get_str_variable_vector(self._wii_info_result),
"wii data result:\n",
debug_helper.get_str_variable_vector(self._wii_data_result),
"wii tags result:\n",
debug_helper.get_str_variable_vector(self._wii_tags_result)
]))
self.working = False