Source code for obci.peers.test.dummy_amplifier
from obci.core.messages import Message
from obci.core.peer import Peer
from obci.utils.signal_generators import AsyncSignalGenerator
[docs]class DummyAmplifierPeer(Peer):
"""
Amplifier peer generates signal using AsyncSignalGenerator.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
[docs] async def initialization_finished(self):
await super().initialization_finished()
self.create_task(self.generate_test_signal())
[docs] async def generate_test_signal(self):
sig_gen = AsyncSignalGenerator()
async for samples in sig_gen:
await self.send_message(Message('SampleVector', self.id, samples))