Source code for obci.core.messages


import abc
import json
from typing import Any, List

from . import types


[docs]class MessageSerializer(metaclass=abc.ABCMeta): """ Message serializer implements two methods `serialize` and `deserialize` to convert to and from bytes to desired data type. """ @staticmethod @abc.abstractmethod
[docs] def serialize(data: Any) -> bytes: raise Exception('Must be reimplemented in subclass')
@staticmethod @abc.abstractmethod
[docs] def deserialize(data: bytes) -> Any: raise Exception('Must be reimplemented in subclass')
[docs]class NullMessageSerializer(MessageSerializer): """ Serialize data to empty (None) object. `deserialize` method always returns `None`. """ @staticmethod
[docs] def serialize(data): return b''
@staticmethod
[docs] def deserialize(data): return None
[docs]class StringMessageSerializer(MessageSerializer): """ Serializes strings. """ @staticmethod
[docs] def serialize(data): return data.encode('utf-8')
@staticmethod
[docs] def deserialize(data): return data.decode('utf-8')
[docs]class JsonMessageSerializer(MessageSerializer): """ Serializes any JSON-serializable objects. """ @staticmethod
[docs] def serialize(data): return json.dumps(data, ensure_ascii=True, separators=(',', ':')).encode('ascii')
@staticmethod
[docs] def deserialize(data): return json.loads(data.decode('ascii'))
[docs]class NoSerializerRegistered(Exception): """ Raised when no serializer was registered for used message type. """
[docs]class Message: serializers = {} def __init__(self, type_id: str, subtype_id: str='', data: Any=None): """ Message consists of: * type - can be any string * subtype - can be any string * data - deserialized message payload New message type must be registered using `Message.register_serializer` method before creating `Message` objects with such type. :param type_id: type of this message :param subtype_id: usually interpreted as sender peer ID :param data: message payload """ super().__init__() self._type = str(type_id) self._subtype = str(subtype_id) self.data = data # type is read only @property def type(self) -> str: return self._type @property def subtype(self) -> str: return self._subtype @subtype.setter def subtype(self, val: str) -> None: self._subtype = str(val) @staticmethod def __get_serializer(msg_type: str): try: return Message.serializers[types.QUERY if msg_type.endswith(types.QUERY) else msg_type] except KeyError: raise NoSerializerRegistered("No serializer for '{}'.".format(msg_type))
[docs] def serialize(self) -> List[bytes]: """ Serialize to ZMQ multipart message. :return: ZMQ multipart message """ serializer = Message.__get_serializer(self.type) data_bytes = serializer.serialize(self.data) return ['{}^{}'.format(self.type, self.subtype).encode('utf-8'), data_bytes]
@staticmethod
[docs] def deserialize(msg: List[bytes]) -> 'Message': """ Create `Message` object from ZMQ multipart message. :param msg: multipart message received by ZMQ :return: Message object """ if len(msg) != 2: raise Exception('Invalid message format') try: type_id, subtype_id = msg[0].decode('utf-8').split('^', maxsplit=1) except Exception: raise Exception('Invalid message format: invalid type or subtype') serializer = Message.__get_serializer(type_id) data = serializer.deserialize(msg[1]) return Message(type_id, subtype_id, data)
@staticmethod
[docs] def register_serializer(msg_type: str, serializer_class) -> None: """ Unregister serializer for specified message type. :param msg_type: message type :param serializer_class: class derived from MessageSerializer """ Message.serializers[msg_type] = serializer_class()
@staticmethod
[docs] def unregister_serializer(msg_type: str) -> None: """ Unregister serializer for specified message type. :param msg_type: message type """ del Message.serializers[msg_type]
# # serializers for predefined message types # Message.register_serializer(types.INVALID_REQUEST, StringMessageSerializer) Message.register_serializer(types.INTERNAL_ERROR, StringMessageSerializer) Message.register_serializer(types.HEARTBEAT, NullMessageSerializer) Message.register_serializer(types.PEERS_READY, NullMessageSerializer) Message.register_serializer(types.PEERS_READY_RECEIVED, NullMessageSerializer) Message.register_serializer(types.OK, NullMessageSerializer) Message.register_serializer(types.ERROR, StringMessageSerializer) Message.register_serializer(types.QUERY, JsonMessageSerializer) Message.register_serializer(types.REDIRECT, JsonMessageSerializer) Message.register_serializer(types.BROKER_HELLO, JsonMessageSerializer) Message.register_serializer(types.BROKER_HELLO_RESPONSE, JsonMessageSerializer) Message.register_serializer(types.BROKER_REGISTER_PEER, JsonMessageSerializer) Message.register_serializer(types.BROKER_REGISTER_PEER_RESPONSE, JsonMessageSerializer) Message.register_serializer(types.BROKER_HEARTBEAT, JsonMessageSerializer) Message.register_serializer(types.BROKER_HEARTBEAT_RESPONSE, JsonMessageSerializer) Message.register_serializer(types.BROKER_REGISTER_QUERY_HANDLER, JsonMessageSerializer) Message.register_serializer(types.BROKER_UNREGISTER_QUERY_HANDLER, JsonMessageSerializer)