Source code for obci.core.peer

import asyncio
import time
import uuid
import argparse
import types
from collections import namedtuple
from typing import Optional, Callable, List, Union, Any

import zmq
import zmq.asyncio

from .message_statistics import MsgPerfStats
from .messages import Message, types as msg_types
from .asyncio_task_manager import ensure_not_inside_msg_loop
from .zmq_asyncio_task_manager import ZmqAsyncioTaskManager
from .message_handler_mixin import MessageHandlerMixin
from ..utils.zmq import bind_to_urls, recv_multipart_with_timeout

QueryDataType = Union[dict, str, int, float, type(None), List[Any]]
QueryHandler = Union[Callable[[Message], Message], Callable[[Message], types.CoroutineType]]


[docs]class PeerInitUrls(namedtuple('PeerInitUrls', ['pub_urls', 'rep_urls', 'broker_rep_url'])): """ List of initial URL adresses. :param list pub_urls: list of PUB URL's to bind to :param list rep_urls: list of REP URL's to bind to :param str broker_rep_url: broker's REP URL """
[docs]class TooManyRedirectsException(Exception): """Raised when too many redirects occurred in Peer.query method."""
[docs]class QueryAnswerUnknown(Exception): """Raised when answer to query is unknown."""
[docs]class MultiplePeersAvailable(Exception): def __init__(self, peers: List[str], *args, **kwargs): """ Raised in Peer.query method when more that one peer can answer specified query. Caller must decide which peer to ask and reissue query by calling Peer.query method with initial_peer parameter set to one of supplied peer. :param peers: list of peers """ super().__init__(*args, **kwargs) self.peers = peers def __str__(self): return super().__str__() + ': ' + ', '.join(str(peer) for peer in self.peers)
[docs]class Peer(ZmqAsyncioTaskManager, MessageHandlerMixin): def __init__(self, urls: Union[str, PeerInitUrls], peer_id: Optional[str] = None, asyncio_loop: Optional[zmq.asyncio.ZMQEventLoop] = None, zmq_context: Optional[zmq.asyncio.Context] = None, zmq_io_threads: int = 1, hwm: int = 1000 ) -> None: """ Base peer class. All peers derive from this class. :param urls: string or PeerInitUrls with initial bootstrap addresses :param peer_id: globally unique identifier :param asyncio_loop: existing ZMQ asyncio message loop or `None` if loop is requested :param zmq_context: existing ZMQ asyncio context or `None` if new context is requested :param zmq_io_threads: number of ZMQ I/O threads :param hwm: ZMQ high water mark """ assert isinstance(urls, (str, PeerInitUrls)) self._id = str(uuid.uuid1()) if peer_id is None else str(peer_id) peer_name = 'Peer_{}'.format(self.id) self._thread_name = peer_name self._logger_name = peer_name super().__init__(asyncio_loop, zmq_context, zmq_io_threads) self._initialization_finished = False self._hwm = hwm self._pub = None # PUB socket for sending messages to broker XSUB self._sub = None # SUB socket for receiving messages from broker's XPUB self._rep = None # synchronous requests from peers self._broker_rep_url = None self._broker_xpub_url = None self._broker_xsub_url = None # listening URLs after binding (e.g. with specific port # numbers when * was given as port number or as IP address) self._pub_urls = [] self._rep_urls = [] self._pub_listening_urls = [] self._rep_listening_urls = [] self._query_handlers = [] if isinstance(urls, str): self._ip_autodiscovery = True self._pub_urls = ['tcp://*:*'] self._rep_urls = ['tcp://*:*'] self._broker_tcp_ip_address = urls self._broker_rep_url = None # TODO: fixme else: self._ip_autodiscovery = False assert isinstance(urls.pub_urls, (list, tuple)) assert isinstance(urls.rep_urls, (list, tuple)) self._pub_urls = urls.pub_urls self._rep_urls = urls.rep_urls self._broker_rep_url = urls.broker_rep_url # heartbeat self._heartbeat_enabled = False self._heartbeat_delay = 0.05 # 50 ms self._max_query_redirects = 10 # logs verbosity self._log_messages = True self._log_peers_info = True # message statistics stats_interval = 4.0 # async send statistics self._calc_send_stats = False self._send_stats = MsgPerfStats(stats_interval, 'SEND') # async receive statistics self._calc_recv_stats = False self._recv_stats = MsgPerfStats(stats_interval, 'RECV') self.create_task(self._connect_to_broker()) # peer id is read only @property def id(self) -> str: """str: Unique identification string. Read only property. Can be set only when creating a new peer instance. """ return self._id @classmethod
[docs] def create_peer(cls, argv: List[str]) -> 'Peer': """Parse supplied argv and create new Peer instance.""" parser = argparse.ArgumentParser(add_help=False) cls.add_arguments(parser) options = parser.parse_known_args(argv)[0] if options.broker_ip is not None: urls = options.broker_ip else: urls = PeerInitUrls(pub_urls=options.pub_urls, rep_urls=options.rep_urls, broker_rep_url=options.broker_rep_url) exclude_list = ['broker_ip', 'pub_urls', 'rep_urls', 'broker_rep_url'] peer = cls(urls, **{k: v for k, v in vars(options).items() if k not in exclude_list}) return peer
@classmethod
[docs] def add_arguments(cls, parser: argparse.ArgumentParser) -> argparse.ArgumentParser: """Add command line args recognized by this class when using `create_peer` method.""" group = parser.add_mutually_exclusive_group(required=True) group.add_argument('--broker-ip') group.add_argument('--broker-rep-url') parser.add_argument('--pub-urls', nargs='+', required=False) parser.add_argument('--rep-urls', nargs='+', required=False) return parser
@staticmethod def __get_filter_bytes(msg_type: str, msg_subtype: Optional[str] = None) -> bytes: return (msg_type + ('' if msg_subtype is None else '^' + msg_subtype)).encode('utf-8')
[docs] def set_filter(self, msg_type: str, msg_subtype: Optional[str] = None) -> None: """ Subscribe for messages with `msg_type` message type. Peer must be initialized to use this function. Args: msg_type: msg_subtype: """ if self._sub is not None: self._sub.subscribe(self.__get_filter_bytes(msg_type, msg_subtype))
[docs] def remove_filter(self, msg_type: str, msg_subtype: Optional[str] = None) -> None: """ Unsubscribe for messages with `msg_type` message type. Peer must be initialized to use this function. Args: msg_type: msg_subtype: """ if self._sub is not None: self._sub.unsubscribe(self.__get_filter_bytes(msg_type, msg_subtype))
def _cleanup(self) -> None: """ Close ZMQ sockets. Set initialization state to `False`. Note: Always remember to call `super()._cleanup()` when overloading this function. """ self._initialization_finished = False self._pub.close(linger=0) self._sub.close(linger=0) self._rep.close(linger=0) self._pub = None self._sub = None self._rep = None super()._cleanup() async def _connect_to_broker(self) -> None: try: self._pub = self._ctx.socket(zmq.PUB) self._sub = self._ctx.socket(zmq.SUB) self._rep = self._ctx.socket(zmq.REP) for socket in [self._pub, self._sub, self._rep]: socket.set_hwm(self._hwm) socket.set(zmq.LINGER, 0) await self.__connect_to_broker_impl() except Exception: self._logger.exception("initialization failed for peer '{}': ".format(self.id)) raise else: self._initialization_finished = True self.create_task(self.heartbeat()) self.create_task(self.initialization_finished()) async def __connect_to_broker_impl(self) -> None: self._pub_listening_urls = bind_to_urls(self._pub, self._pub_urls) self._rep_listening_urls = bind_to_urls(self._rep, self._rep_urls) # self._pub.connect(self._broker_xpub_url) if self._log_peers_info: msg = ("\n" "Peer '{}': Initial PUB & REP bind finished.\n" "PUB: {}\n" "REP: {}\n" "\n").format(self.id, ', '.join(self._pub_listening_urls), ', '.join(self._rep_listening_urls)) self._logger.debug(msg) # TODO: implement self._ip_autodiscovery = True if self._ip_autodiscovery: raise Exception('self._ip_autodiscovery = True not implemented') # send hello to broker, receive extra URLs to bind PUB and REP sockets to response = await self.send_broker_message( Message(msg_types.BROKER_HELLO, self.id, { 'pub_urls': self._pub_listening_urls, 'rep_urls': self._rep_listening_urls }), timeout=30.0 ) self._pub_urls += response.data['extra_pub_urls'] self._rep_urls += response.data['extra_rep_urls'] self._pub_listening_urls += bind_to_urls(self._pub, response.data['extra_pub_urls']) self._rep_listening_urls += bind_to_urls(self._rep, response.data['extra_rep_urls']) if self._log_peers_info: msg = ("\n" "Peer '{}': After BROKER_HELLO.\n" "PUB: {}\n" "REP: {}\n" "\n").format(self.id, ', '.join(self._pub_listening_urls), ', '.join(self._rep_listening_urls)) self._logger.debug(msg) # after binding PUB and REP sockets send real URLs to the broker # and receive broker's XPUB port to connect SUB to response = await self.send_broker_message( Message(msg_types.BROKER_REGISTER_PEER, self.id, { 'pub_urls': self._pub_listening_urls, 'rep_urls': self._rep_listening_urls }), timeout=30.0 ) self._broker_xpub_url = response.data['xpub_url'] self._broker_xsub_url = response.data['xsub_url'] self._sub.connect(self._broker_xpub_url) self._pub.connect(self._broker_xsub_url) if self._log_peers_info: msg = ("\n" "Peer '{}'. Connect to Broker finished.\n" "Connected to broker at REP {}; XPUB {}\n" "PUB URLs: {}\n" "REP URLs: {}\n" "\n").format(self.id, self._broker_rep_url, self._broker_xpub_url, ', '.join(self._pub_listening_urls), ', '.join(self._rep_listening_urls)) self._logger.info(msg)
[docs] async def initialization_finished(self) -> None: """ Run when initialization finished and all message sending mechanisms are available to use. """ self.create_task(self._receive_sync_messages()) self.create_task(self._receive_async_messages())
[docs] async def heartbeat(self) -> None: """ Periodically send HEARTBEAT messages. """ heartbeat_message = Message(msg_types.HEARTBEAT, self.id) while True: heartbeat_timestamp = time.monotonic() if self._heartbeat_enabled: await self.send_message(heartbeat_message) sleep_duration = self._heartbeat_delay - (time.monotonic() - heartbeat_timestamp) if sleep_duration < 0: sleep_duration = 0 await asyncio.sleep(sleep_duration)
[docs] async def send_broker_message(self, msg: Message, timeout: float = 5.0) -> Message: """ Send message to Broker and return answer. :param msg: message object to send :param timeout: timeout in seconds :return: response message """ return await self.send_message_to_peer(self._broker_rep_url, msg, timeout)
[docs] async def send_message_to_peer(self, url: str, msg: Message, timeout: float = 5.0) -> Message: """ Send message to specified peer and return answer. :param url: peer's REP socket URL :param msg: message object to send :param timeout: timeout in seconds :return: response message """ if self._log_messages: self._logger.debug("sending sync message to '{}': type '{}', subtype '{}'" .format(url, msg.type, msg.subtype)) req = self._ctx.socket(zmq.REQ) req.connect(url) try: await req.send_multipart(msg.serialize()) response = await recv_multipart_with_timeout(req, timeout) finally: req.close(linger=0) return Message.deserialize(response)
[docs] async def send_message(self, msg: Message) -> None: """ Send broadcast message. :param msg: message object to send """ if self._log_messages: self._logger.debug("sending async message: type '{}', subtype '{}'" .format(msg.type, msg.subtype)) serialized_msg = msg.serialize() if self._calc_send_stats: self._send_stats.msg(serialized_msg) await self._pub.send_multipart(serialized_msg)
@ensure_not_inside_msg_loop
[docs] def query(self, query_type: str, query_params: Optional[dict] = None, initial_peer: Optional[str] = None ) -> QueryDataType: """ Send query message to Broker (or any other peer in `initial_peer` is specified). Returned value can be any JSON-serializable object. :param query_type: query type :param query_params: optional query parameters :param initial_peer: if specified this peer will be asked instead of Broker :return: query response """ return self.create_task(self.query_async(query_type, query_params, initial_peer)).result()
[docs] async def query_async(self, query_type: str, query_params: Optional[dict] = None, initial_peer: Optional[str] = None ) -> QueryDataType: """ Async version of :func:`Peer.query`. """ if query_params is None: query_params = {} query_msg = Message(query_type, self.id, query_params) url = self._broker_rep_url if initial_peer is None else initial_peer redirects = 0 while True: response = await self.send_message_to_peer(url, query_msg) if response.type == msg_types.REDIRECT: urls = response.data['peers'] assert len(urls) > 0 if len(urls) == 1: url = urls[0][1] elif len(urls) > 1: raise MultiplePeersAvailable(urls, 'Multiple peers can answer this query') elif response.type == msg_types.INVALID_REQUEST or response.type == msg_types.INTERNAL_ERROR: raise QueryAnswerUnknown() else: return response.data redirects += 1 if redirects >= self._max_query_redirects: self._logger.error("max redirects ({}) reached when executing query '{}'" .format(self._max_query_redirects, query_type)) raise TooManyRedirectsException('max redirects reached')
@ensure_not_inside_msg_loop
[docs] def register_query_handler(self, msg_type: str, handler: QueryHandler) -> None: """ Register callback handler for specified query type. `handler` function must return a valid `Message` object. :param msg_type: query type :param handler: function to execute when specified query is received """ self.create_task(self.register_query_handler_async(msg_type, handler)).exception()
[docs] async def register_query_handler_async(self, msg_type: str, handler: QueryHandler) -> None: """ Async version of :func:`Peer.register_query_handler_async`. """ response = await self.send_broker_message( Message(msg_types.BROKER_REGISTER_QUERY_HANDLER, self.id, {'msg_type': msg_type})) if response.type != msg_types.OK: raise Exception('') self._query_handlers.append(msg_type) self.register_message_handler(msg_type, handler)
@ensure_not_inside_msg_loop
[docs] def unregister_query_handler(self, msg_type: Optional[str] = None) -> None: """ Unregister callback handler for specified query type or for all query types if `msg_type` is `None`. :param msg_type: query type """ self.create_task(self.unregister_query_handler_async(msg_type)).exception()
[docs] async def unregister_query_handler_async(self, msg_type: Optional[str] = None) -> None: """ Async version of :func:`Peer.unregister_query_handler`. """ response = await self.send_broker_message( Message(msg_types.BROKER_UNREGISTER_QUERY_HANDLER, self.id, {'msg_type': msg_type})) if response.type != msg_types.OK: raise Exception('') if msg_type is None: for q_type in self._query_handlers: self.unregister_message_handler(q_type) else: self.unregister_message_handler(msg_type)
async def _receive_sync_messages(self) -> None: async def sync_handler(): try: try: msg_raw = await self._rep.recv_multipart() msg = Message.deserialize(msg_raw) if self._log_messages: self._logger.debug("received sync message: type '{}', subtype: '{}'" .format(msg.type, msg.subtype)) response = await self.handle_message(msg) if not isinstance(response, Message): raise Exception("Bad handler") response = response.serialize() except Exception as ex: response = Message(msg_types.INTERNAL_ERROR, self.id, str(ex)).serialize() raise finally: await self._rep.send_multipart(response) except Exception: self._logger.exception('Uncaught exception in sync message handler') await self.__receive_messages_helper(self._rep, sync_handler) async def _receive_async_messages(self) -> None: async def async_handler(): try: msg = Message.deserialize(await self._sub.recv_multipart()) if self._calc_recv_stats: self._recv_stats.msg(msg) if self._log_messages: self._logger.debug("received async message: type '{}', subtype: '{}'" .format(msg.type, msg.subtype)) response = await self.handle_message(msg) if response is not None: await self.send_message(response) except Exception: self._logger.exception('Uncaught exception in async message handler') await self.__receive_messages_helper(self._sub, async_handler) @staticmethod async def __receive_messages_helper(socket: zmq.asyncio.Socket, handler: Callable[[], types.CoroutineType] ) -> None: """ Two concurrent polling loops are run (for SUB and REP) to avoid one message type processing blocking another. """ poller = zmq.asyncio.Poller() poller.register(socket, zmq.POLLIN) while True: events = await poller.poll(timeout=100) # timeout in milliseconds if socket in dict(events): await handler()