Source code for obci.acquisition.info_saver_peer

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author:
#     Mateusz KruszyƄski <mateusz.kruszynski@titanis.pl>

import os.path
import time
import sys

from obci.mx_legacy.multiplexer_constants import peers, types
from obci.control.peer.configured_multiplexer_server import ConfiguredMultiplexerServer

from obci.configs import settings, variables_pb2
from obci.analysis.obci_signal_processing.signal import info_file_proxy
from obci.utils.openbci_logging import log_crash

INFO_FILE_EXTENSION = ".obci.xml"


[docs]class InfoSaver(ConfiguredMultiplexerServer): """A class for creating a manifest file with metadata.""" @log_crash def __init__(self, addresses, peer_type=peers.INFO_SAVER): super(InfoSaver, self).__init__(addresses=addresses, type=peer_type) # local params l_f_name = self.config.get_param("save_file_name") l_f_dir = self.config.get_param("save_file_path") self._file_path = os.path.expanduser(os.path.normpath(os.path.join( l_f_dir, l_f_name + INFO_FILE_EXTENSION))) self._info_proxy = info_file_proxy.InfoFileWriteProxy(self._file_path) self.append_ts = int(self.config.get_param("append_timestamps")) # external params self.freq = float(self.config.get_param("sampling_rate")) self.sample_type = self.config.get_param("sample_type") self.ch_nums = self.config.get_param("active_channels").split(";") self.ch_names = self.config.get_param("channel_names").split(";") self.ch_gains = [float(i) for i in self.config.get_param("channel_gains").split(";")] self.ch_offsets = [float(i) for i in self.config.get_param("channel_offsets").split(";")] self.ready()
[docs] def handle_message(self, mxmsg): """Handle messages: * signal_saver_control_message - a message from signal saver being a signal to finish saving.""" if mxmsg.type == types.__dict__[self.config.get_param("finished_signal_type")]: self.logger.info("Got signal saver finished!") l_vec = variables_pb2.VariableVector() l_vec.ParseFromString(mxmsg.message) l_num_of_samples = None l_file_path = None for i_var in l_vec.variables: if i_var.key == 'number_of_samples': l_num_of_samples = i_var.value elif i_var.key == 'file_path': l_file_path = i_var.value elif i_var.key == 'first_sample_timestamp': l_first_sample_ts = i_var.value self._finish_saving( l_num_of_samples, l_file_path, l_first_sample_ts) time.sleep(3) sys.exit(0)
def _finish_saving(self, p_number_of_samples, p_data_file_path, p_first_sample_ts): """Create xml manifest file with data received from hashtable and from signal saver (parameters in the function).""" l_signal_params = { 'number_of_channels': len(self.ch_nums), 'sampling_frequency': self.freq, 'sample_type': self.sample_type, 'channels_numbers': self.ch_nums, 'channels_names': self.ch_names, 'channels_gains': self.ch_gains, 'channels_offsets': self.ch_offsets, 'number_of_samples': p_number_of_samples, 'file': p_data_file_path, 'first_sample_timestamp': p_first_sample_ts } if self.append_ts: l_signal_params['number_of_channels'] += 1 l_signal_params["channels_numbers"].append("1000") # Add name to special channel l_signal_params["channels_names"].append("TSS") # Add gain to special channel l_signal_params["channels_gains"].append(1.0) # Add offset to special channel l_signal_params["channels_offsets"].append(0.0) l_log = "Finished saving info with values:\n" for i_key, i_value in l_signal_params.items(): l_log = ''.join([l_log, i_key, " : ", str(i_value), "\n"]) self.logger.info(l_log) self._info_proxy.finish_saving(l_signal_params) l_msg = self._serialise_finished_msg(l_signal_params) self.conn.send_message( message=l_msg, type=types.__dict__[self.config.get_param("finished_info_type")], flush=True) def _serialise_finished_msg(self, params): l_dict = { 'number_of_channels': str(params['number_of_channels']), 'sampling_frequency': str(params['sampling_frequency']), 'channels_numbers': ';'.join([str(i) for i in params['channels_numbers']]), 'channels_names': ';'.join(params['channels_names']), 'channels_gains': ';'.join([str(i) for i in params['channels_gains']]), 'channels_offsets': ';'.join([str(i) for i in params['channels_offsets']]), 'number_of_samples': str(params['number_of_samples']), 'file': str(params['file']), 'first_sample_timestamp': str(params['first_sample_timestamp']), # 'file_path':self._file_path } l_vec = variables_pb2.VariableVector() for k, v in l_dict.items(): l_var = l_vec.variables.add() l_var.key = k l_var.value = v return l_vec.SerializeToString()
if __name__ == "__main__": InfoSaver(settings.MULTIPLEXER_ADDRESSES).loop()