Source code for obci.core.message_statistics
import time
from obci.core.messages import Message
[docs]class MsgPerfStats:
def __init__(self, interval: float, name: str = '') -> None:
"""
`MsgPerfStats` receives messages with `msg` function and periodically
prints some statistical information.
:param interval: how often statistics will be printed
:param name: name of this counter
"""
super().__init__()
self._name = name
self._interval = interval
self._calc_size = False
self.reset()
[docs] def reset(self) -> None:
"""
Reset statistics.
"""
self._last_time = time.time()
self._start_time = self._last_time
self._count = 0
self._total_size = 0
[docs] def msg(self, msg: Message) -> None:
"""
Called to count new message into statistics.
:param msg: message to include into statistics
"""
self._last_time = time.time()
self._count += 1
if self._calc_size:
self._total_size += sum(map(len, msg.serialize()))
measurement_time = self._last_time - self._start_time
if measurement_time > self._interval:
if self._calc_size:
mean_size = int(self._total_size / self._count)
megabytes_per_second = (self._total_size / measurement_time) / 1e6
messages_per_second = self._count / measurement_time
if self._name:
print('stats for "{}"'.format(self._name))
print('message count: {:6d} [msgs]'.format(self._count))
if self._calc_size:
print('mean message size: {:6d} [B]'.format(mean_size))
print('mean throughput: {:4.2f} [msg/s]'.format(messages_per_second))
if self._calc_size:
print('mean throughput: {:2.4f} [MB/s]'.format(megabytes_per_second))
print('measurement time: {:2.4f} [s]'.format(measurement_time))
print('')
self.reset()
@property
def interval(self) -> float:
"""
How often message statistics will be printed to stdout. Specified in seconds.
.. note::
Interval is checked only in self.msg function, this class doesn't use its own timer.
"""
return self._interval
@interval.setter
def interval(self, interval: float) -> None:
self._interval = interval
self.reset()