Source code for obci.peers.test.dummy_signal_verifier


from obci.core.peer import Peer
from obci.utils.signal_generators import SawVerifier


[docs]class DummySignalVerifierPeer(Peer): """ Receives signal generated by `DummySignalReceiverPeer` and verifies saw signal. """ def __init__(self, *args, receiver_peer_name='Receiver', **kwargs): super().__init__(*args, **kwargs) self._saw_verifier = SawVerifier() self.signal_ok = None self._receiver_peer_name = receiver_peer_name
[docs] async def initialization_finished(self): await super().initialization_finished() self.register_message_handler('SampleVector', self.handle_signal_message) self.set_filter('SampleVector', self._receiver_peer_name)
[docs] async def handle_signal_message(self, msg): for d in msg.data.samples: data = d.channels # print('mean: {}'.format(data[1])) try: self._saw_verifier.verify_next(data[2]) if self.signal_ok is None: self.signal_ok = True except Exception: self.signal_ok = False