Source code for obci.benchmark.averaging_peer
#!/usr/bin/env python3
import numpy
from obci.mx_legacy.multiplexer_constants import peers, types
from obci.control.peer.configured_multiplexer_server import ConfiguredMultiplexerServer
from obci.configs import settings, variables_pb2
from obci.utils.openbci_logging import log_crash
[docs]class AveragingPeer(ConfiguredMultiplexerServer):
@log_crash
def __init__(self, addresses):
super().__init__(addresses=addresses, type=peers.STREAM_RECEIVER)
self._manage_params()
self.ready()
def _manage_params(self):
self.input_mx_type = types.__dict__[self.config.get_param("input_mx_type")]
self.output_mx_type = types.__dict__[self.config.get_param("output_mx_type")]
[docs] def handle_message(self, mxmsg):
if mxmsg.type == self.input_mx_type:
msg = mxmsg.message
input = variables_pb2.SampleVector()
input.ParseFromString(msg)
output = variables_pb2.SampleVector()
for i_sample in input.samples:
o_sample = output.samples.add()
o_sample.channels.append(numpy.mean(i_sample.channels))
o_sample.timestamp = i_sample.timestamp
self.conn.send_message(message=output, type=self.output_mx_type, flush=True)
self.no_response()
if __name__ == "__main__":
AveragingPeer(settings.MULTIPLEXER_ADDRESSES).loop()