Source code for obci.core.message_handler_mixin


import asyncio
import types
from typing import Optional, Callable, Union

from obci.core.messages import Message

# TODO: this should be union of types.CoroutineType with proper parameters and Callable
# TODO: wait for proper coroutines support in typing module
HandlerType = Union[Callable[[Message], Optional[Message]], Callable[[Message], types.CoroutineType]]


[docs]class MessageHandlerMixin: def __init__(self) -> None: """ Implements common message handling interface used by Peer and Broker classes. """ self._message_handlers = {} super().__init__()
[docs] def register_message_handler(self, msg_type: str, handler: HandlerType) -> None: """ Register `handler` function to be called when message with `msg_type` arrives. Args: msg_type: Message type string. handler: Function called when new message arrives. """ self._message_handlers[msg_type] = handler
[docs] def unregister_message_handler(self, msg_type: str) -> None: """ Unregister previously registered message handler. Args: msg_type: Message type string. """ del self._message_handlers[msg_type]
[docs] async def handle_message(self, msg: Message) -> Optional[Message]: """ Called by message dispatching loop when new message arrives. Args: msg: message to handle Returns: value returned by handler Raises: KeyError: when handler for `msg.type` is not registered """ response = self._message_handlers[msg.type](msg) if asyncio.iscoroutine(response): return await response else: return response