Source code for obci.peers.test.dummy_signal_verifier
from obci.core.peer import Peer
from obci.utils.signal_generators import SawVerifier
[docs]class DummySignalVerifierPeer(Peer):
"""
Receives signal generated by `DummySignalReceiverPeer` and verifies saw
signal.
"""
def __init__(self, *args, receiver_peer_name='Receiver', **kwargs):
super().__init__(*args, **kwargs)
self._saw_verifier = SawVerifier()
self.signal_ok = None
self._receiver_peer_name = receiver_peer_name
[docs] async def initialization_finished(self):
await super().initialization_finished()
self.register_message_handler('SampleVector', self.handle_signal_message)
self.set_filter('SampleVector', self._receiver_peer_name)
[docs] async def handle_signal_message(self, msg):
for d in msg.data.samples:
data = d.channels
# print('mean: {}'.format(data[1]))
try:
self._saw_verifier.verify_next(data[2])
if self.signal_ok is None:
self.signal_ok = True
except Exception:
self.signal_ok = False