Source code for obci.core.zmq_asyncio_task_manager

from typing import Optional

import zmq
import zmq.asyncio

from obci.core import OBCI_DEBUG
from .asyncio_task_manager import AsyncioTaskManager


[docs]class ZmqAsyncioTaskManager(AsyncioTaskManager): @staticmethod
[docs] def new_event_loop() -> zmq.asyncio.ZMQEventLoop: """ Overloaded function to create ZMQEventLoop instances. Returns: new instance if ZMQEventLoop """ loop = zmq.asyncio.ZMQEventLoop() if OBCI_DEBUG: loop.set_debug(True) return loop
_thread_name = 'ZmqAsyncioTaskManager' _logger_name = 'ZmqAsyncioTaskManager' def __init__(self, asyncio_loop: Optional[zmq.asyncio.ZMQEventLoop]=None, zmq_context: Optional[zmq.asyncio.Context]=None, zmq_io_threads: int=1 ) -> None: """ Adds ZMQ context management to `AsyncioTaskManager`. :param asyncio_loop: existing message loop or `None` if new message loop should be created :param zmq_context: existing asyncio ZMQ context or `None` if new context should be created :param zmq_io_threads: number if ZMQ I/O threads """ assert zmq_context is None or isinstance(zmq_context, zmq.asyncio.Context) super().__init__(asyncio_loop) self._zmq_io_threads = zmq_io_threads if zmq_context is None: self._owns_ctx = True self._ctx = zmq.asyncio.Context(io_threads=self._zmq_io_threads) else: self._owns_ctx = False self._ctx = zmq_context @property def owns_zmq_context(self) -> bool: """True if ZMQ context is owned by this object.""" return self._owns_ctx def _cleanup(self) -> None: """ If this class owns ZMQ context remember to close all sockets before calling this function through `super()._cleanup()`. """ if self._owns_ctx: self._ctx.destroy(linger=0) self._ctx = None super()._cleanup()