Source code for obci.analysis.obci_signal_processing.signal.data_generic_write_proxy
#!/usr/bin/env python3
# Author:
# Mateusz KruszyĆski <mateusz.kruszynski@gmail.com>
#
import struct
import sys
import os.path
from obci.configs import variables_pb2
from . import signal_exceptions
from . import signal_constants
from . import signal_logging as logger
LOGGER = logger.get_logger("data_generic_write_proxy", 'info')
SAMPLE_STRUCT_TYPES = signal_constants.SAMPLE_STRUCT_TYPES
[docs]class DataGenericWriteProxy(object):
"""
A class representing data file.
It should be an abstraction for saving raw data into a file.
Decision whether save signal to one or few separate files should be made here
and should be transparent regarding below interface - the interface should remain untouched.
Public interface:
- finish_saving() - closes data file and return its path,
- data_received(p_data_sample) - gets and saves next sample of signal
"""
def __init__(self, p_file_path, p_unpack_later=False, p_append_ts=False, p_sample_type='FLOAT'):
"""Open p_file_name file in p_dir_path directory."""
self._number_of_samples = 0
self._unpack_later = p_unpack_later
self._append_ts = p_append_ts
self._file_path = p_file_path
self._sample_struct_type = SAMPLE_STRUCT_TYPES[p_sample_type]
try:
if self._unpack_later:
# Create a temporary file with .tmp extension
# In finish_saving we read from that file and create another
# for efficency reasons
self._file = open(self._file_path + '.tmp', 'wb') # open file in a binary mode
else:
self._file = open(self._file_path, 'wb') # open file in a binary mode
except IOError:
LOGGER.error("Error! Can`t create a file!!!. path: " +
str(self._file_path))
sys.exit(1)
[docs] def data_received(self, p_data):
raise Exception("To be subclassed")
[docs] def set_data_len(self, ln, sm):
"""Set length of one unit of protobuf data
stored in temporary file. It`ll be useful
in finish_saving() while extracting data from the file."""
self._data_len = ln
self._samples_per_vector = sm
[docs] def set_first_sample_timestamp(self, timestamp):
self.first_sample_timestamp = timestamp
[docs] def finish_saving(self):
"""Close the file, return a tuple -
file`s name and number of samples."""
self._file.flush()
self._file.close()
if self._unpack_later:
return self._unpack_and_finish()
else:
return self._file_path, self._number_of_samples
def _unpack_and_finish(self):
final_file = open(self._file_path, 'w')
# Open once more temporary file with protobuf data
temp_file = open(self._file_path + '.tmp', 'r')
while True:
msg = temp_file.read(self._data_len)
if len(msg) == 0:
break
self._write_file(self._vect_to_string(msg), final_file)
final_file.flush()
final_file.close()
# Close and remove temporary file
temp_file.close()
os.remove(self._file_path + '.tmp')
return self._file_path, self._number_of_samples
def _write_file(self, str_data, f=None):
try:
if f:
f.write(str_data)
else:
self._file.write(str_data)
except ValueError:
LOGGER.error("Warning! Trying to write data to closed data file!")
return
def _vect_to_string(self, p_mx_vect):
l_vec = variables_pb2.SampleVector()
l_vec.ParseFromString(p_mx_vect)
samples = []
for j in range(len(l_vec.samples)):
s = l_vec.samples[j]
ts = s.timestamp
try:
strs = [struct.pack(self._sample_struct_type, ch) for ch in s.channels]
if self._append_ts:
strs.append(struct.pack(self._sample_struct_type, ts - self.first_sample_timestamp))
except struct.error:
LOGGER.error("Error while writhing to file. Bad sample format.")
raise signal_exceptions.BadSampleFormat()
samples.append(b''.join(strs))
return b''.join(samples)