Source code for obci.control.common.config_message

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from obci.mx_legacy.multiplexer_constants import types

import obci.control.common.cfg_messages_pb2 as templates

import json
from google.protobuf.message import Message
from functools import reduce

MX_CFG_MESSAGES = {
    types.GET_CONFIG_PARAMS: templates.ConfigParamsRequest,
    types.CONFIG_PARAMS: templates.ConfigParams,
    types.REGISTER_PEER_CONFIG: templates.ConfigParams,
    types.UNREGISTER_PEER_CONFIG: templates.PeerIdentity,
    types.PEER_REGISTERED: templates.PeerIdentity,
    types.UPDATE_PARAMS: templates.ConfigParams,
    types.PARAMS_CHANGED: templates.ConfigParams,
    types.PEER_READY: templates.PeerIdentity,
    types.PEERS_READY_QUERY: templates.PeerReadyQuery,
    types.READY_STATUS: templates.PeerReadyStatus,
    types.CONFIG_ERROR: templates.ConfigError,
    types.PEER_READY_SIGNAL: templates.PeerIdentity,
    types.LAUNCHER_COMMAND: templates.LauncherCommand
}


[docs]def msg_for_type(mx_type): msg = MX_CFG_MESSAGES.get(mx_type, None) return msg() if msg is not None else None
[docs]def unpack_msg(mx_type, msg_str): msg = msg_for_type(mx_type) if msg is None: raise ConfigMessageError() msg.ParseFromString(msg_str) return msg
[docs]def fill_msg(mx_type, **kwargs): """This does NOT encode any values to json!""" msg = msg_for_type(mx_type) if msg is None: raise ConfigMessageError() return __pb2_construct(msg, **kwargs)
[docs]def pack_msg(msg): return msg.SerializeToString()
[docs]def fill_and_pack(mx_type, **kwargs): return pack_msg(fill_msg(mx_type, **kwargs))
[docs]def val2str(value): return json.dumps(value)
[docs]def str2val(string): return json.loads(string)
def __pb2_construct(what, **kwargs): # copied from k2launcher.utils # TODO cleaner w = what for k in kwargs: v = kwargs[k] if isinstance(v, list): p = w.__getattribute__(k) for l in v: p.append(l) elif isinstance(v, Message): w.__getattribute__(k).CopyFrom(v) else: w.__setattr__(k, v) return w def __msg_to_dict(msg, val_f=None): fun = val_f if val_f else lambda x: x def upd(dic, p): dic.update({p[0].name: fun(p[1])}) return dic d = reduce(upd, msg.ListFields(), {}) return d
[docs]def msg_to_dict(msg, val_f=None): """Does not convert nested messages!""" dic = __msg_to_dict(msg, val_f) return dic
def __param2msg(param, value): return __pb2_construct(templates.Param(), name=param, value=value)
[docs]def params2dict(config_params_msg, field_name="params"): """Take parameters from the config_params_msg, put them in a dictionary (with decoded values from json) and return it. The message should have a 'params' attribute""" dic = {} if hasattr(config_params_msg, field_name): for par_msg in getattr(config_params_msg, field_name): dic[par_msg.name] = str2val(par_msg.value) return dic
[docs]def dict2params(dic, config_params_msg, field_name="params"): """Take parameters from the dictionary dic, encode values to json and put them in the config_params_msg. The message should have a 'params' attribute""" for name in dic: par_msg = getattr(config_params_msg, field_name).add() par_msg.name = name par_msg.value = val2str(dic[name])
[docs]def launcher_cmd(sender_id, encoded_launcher_msg): return fill_and_pack(types.LAUNCHER_COMMAND, sender=sender_id, serialized_msg=encoded_launcher_msg),\ types.LAUNCHER_COMMAND
[docs]class ConfigMessageError(Exception): pass
if __name__ == '__main__': msg = fill_msg(types.PEER_READY, peer_id="dupa") print(msg) print(msg_to_dict(msg)) msg2 = fill_msg(types.GET_CONFIG_PARAMS, sender="ja", receiver="ty", param_names=["par1", "par2"]) print(msg_to_dict(msg2)) print(msg2) msg3 = fill_msg(types.CONFIG_PARAMS, sender="ja", receiver="ty") dict2params(dict(asdf=123, sdfg="alalala"), msg3) print(params2dict(msg3))