Source code for obci.utils.zmq
import asyncio
import time
from typing import List
import zmq
[docs]def bind_to_urls(socket, urls: List[str]) -> List[str]:
"""
Bind socket to a given list of URLs. If duplicate URLs are given this function
binds only once per URL. A unique list of real bound URLs is returned.
:param zmq.Socket socket: ZMQ socket to bind
:param list[str] urls: list of URLs to bind to
:return: list of real bounded URLs
"""
listening_urls = set()
for url in list(set(urls)):
socket.bind(url)
real_url = socket.getsockopt(zmq.LAST_ENDPOINT)
if real_url:
listening_urls.add(real_url.decode())
return list(listening_urls)
[docs]class TimeoutException(Exception):
"""
Raised by `recv_multipart_with_timeout` when timeout is reached.
"""
[docs]async def recv_multipart_with_timeout(socket,
timeout: float=1.0,
sleep_interval: float=0.01
) -> bytes:
"""
This wrapper exists because of a bug in socket.recv_multipart function
(zmq.asyncio sockets ignore RCVTIMEO option).
For more information see: https://github.com/zeromq/pyzmq/issues/825.
"""
start_time = time.monotonic()
while True:
try:
response = await socket.recv_multipart(zmq.NOBLOCK)
return response
except zmq.error.Again:
if time.monotonic() - start_time > timeout:
raise TimeoutException()
await asyncio.sleep(sleep_interval)