Source code for obci.peers.test.dummy_amplifier


from obci.core.messages import Message
from obci.core.peer import Peer

from obci.utils.signal_generators import AsyncSignalGenerator


[docs]class DummyAmplifierPeer(Peer): """ Amplifier peer generates signal using AsyncSignalGenerator. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] async def initialization_finished(self): await super().initialization_finished() self.create_task(self.generate_test_signal())
[docs] async def generate_test_signal(self): sig_gen = AsyncSignalGenerator() async for samples in sig_gen: await self.send_message(Message('SampleVector', self.id, samples))