Source code for obci.utils.signal_generators
import time
import random
import asyncio
import numpy as np
from obci.core.messages.protobuf_serializer import proto
MAX_VAL = 10
"""int: maximal value of saw signal"""
[docs]def saw_generator(max_val: int = MAX_VAL):
"""
Saw signal generator.
:param max_val: maximal value, after which generator drops to 0 and restarts
"""
counter = 0
while True:
yield counter
counter += 1
if counter > max_val:
counter = 0
[docs]class SawVerifier:
"""Raises exception if saw signal is not valid."""
def __init__(self, max_val: int = MAX_VAL):
super().__init__()
self._saw_gen = saw_generator(max_val)
[docs] def verify_next(self, value):
assert next(self._saw_gen) == value
[docs]class AsyncSignalGenerator:
def __init__(self):
"""
Implements async generator for test signal.
Following channels are generated:
#. samples counter
#. :func:`time.time` value
#. :func:`time.monotonic` value
#. always 0.0
#. always 1.0
#. always -1.0
#. alternating sequence of 0 and 1
#. 100 Hz sinus
#. :func:`random.random` generated floats
#. saw signal
"""
super().__init__()
self._last_time = None
self._sampling_rate = 16.0
self._samples_per_iteration = 4
self._samples_delay = 1.0 / (self._sampling_rate / self._samples_per_iteration)
self._stop = False
self._samples_counter = 0
self._last_flip = 0
self._saw_gen = saw_generator()
def __aiter__(self):
return self
async def __anext__(self) -> proto.SampleVector:
if self._last_time is None:
self._last_time = time.monotonic()
sleep_duration = self._samples_delay - (time.monotonic() - self._last_time)
# When we are late with the next sample sleep_duration is < 0. Despite being late call
# asyncio.sleep(0) as it allows other, pending coroutines to be scheduled.
if sleep_duration < 0:
sleep_duration = 0
await asyncio.sleep(sleep_duration)
if self._stop:
raise StopAsyncIteration
self._last_time = time.monotonic()
return proto.SampleVector(samples=[self._get_next_sample() for _ in range(self._samples_per_iteration)])
def _get_next_sample(self) -> proto.Sample:
sample = proto.Sample(timestamp=time.time(), channels=np.array([
self._samples_counter,
time.time(),
time.monotonic(),
0.0,
1.0,
-1.0,
self._last_flip,
np.sin(2.0 * np.pi * 100.0 * self._samples_counter / self._sampling_rate), # 100 Hz sin
random.random(),
next(self._saw_gen)
], dtype=float))
self._samples_counter += 1
self._last_flip = 0 if self._last_flip == 1 else 1
return sample