Source code for obci.control.peer.config_server

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import zmq
import time
import json
import configparser
import os


from obci.mx_legacy.multiplexer_constants import peers, types
from obci.mx_legacy.clients import BaseMultiplexerServer
from obci.control.peer.peer_cmd import PeerCmd
from obci.configs import settings


import obci.control.common.config_message as cmsg
from obci.control.common.obci_control_settings import DEFAULT_SANDBOX_DIR
from obci.control.launcher.launcher_messages import message_templates
from obci.control.common.message import OBCIMessageTool, send_msg

from obci.utils.openbci_logging import get_logger, log_crash


[docs]class ConfigServer(BaseMultiplexerServer): @log_crash def __init__(self, addresses): super(ConfigServer, self).__init__(addresses=addresses, type=peers.CONFIG_SERVER) self._configs = {} self._ext_configs = {} self._ready_peers = [] self.__to_all = False self.mtool = OBCIMessageTool(message_templates) self.launcher_sock = None params, other_params = PeerCmd().parse_cmd() self.addr = params['local_params'].get('launcher_socket_addr', '') self.exp_uuid = params['local_params'].get('experiment_uuid', '') self.log_dir = params['local_params'].get('log_dir', None) self.logger = get_logger('config_server', log_dir=self.log_dir, conn=self.conn, file_level=params['local_params'].get('file_log_level', None), mx_level=params['local_params'].get('mx_log_level', None), stream_level=params['local_params'].get('console_log_level', None)) self._old_configs = self._stored_config() self._restore_peers = params['local_params'].get('restore_peers', '').split() for peer in self._restore_peers: if peer in self._old_configs["local"]: self._configs[peer] = dict(self._old_configs["local"][peer]) self._ready_peers.append(peer) if peer in self._old_configs["ext"]: self._ext_configs[peer] = dict(self._old_configs["ext"][peer]) if self.addr != '': self.ctx = zmq.Context() self.launcher_sock = self.ctx.socket(zmq.PUSH) try: self.launcher_sock.connect(self.addr) except Exception as e: self.logger.error("failed to connect to address " + self.addr + " !!!") self.launcher_sock = None else: self.logger.info("OK: connected to " + self.addr) send_msg(self.launcher_sock, self.mtool.fill_msg("config_server_ready")) self.logger.info("connections count ::::::::: %s", self.conn.connections_count()) def _config_path(self): base_config_path = "config_server.ini" return os.path.abspath(os.path.join(DEFAULT_SANDBOX_DIR, base_config_path)) def _stored_config(self): parser = configparser.RawConfigParser() stored = None storedconf = self._config_path() if not os.path.exists(storedconf): self.logger.info("No config stored %s", str(storedconf)) else: with open(storedconf, 'r') as f: self.logger.info("found stored config %s", str(storedconf)) parser.readfp(f) if not parser.has_option('local_params', 'stored_config'): stored = '' else: stored = parser.get('local_params', 'stored_config') if stored == '': stored = '{}' return json.loads(stored) def _save_config(self): base_config_path = self._config_path() parser = configparser.RawConfigParser() # print "CONFIG_SERVER save path", base_config_path if os.path.exists(base_config_path): with open(base_config_path, 'r') as f: parser.readfp(f) if not parser.has_section('local_params'): parser.add_section('local_params') parser.set('local_params', 'stored_config', json.dumps({"local": self._configs, "ext": self._ext_configs})) parser.set('local_params', 'launcher_socket_addr', '') parser.set('local_params', 'experiment_uuid', '') parser.set('local_params', 'restore_peers', '') with open(base_config_path, 'w') as f: parser.write(f) try: os.chmod(base_config_path, 0o777) except OSError as e: self.logger.error("tried to change permissions to" + base_config_path + "to 777 but" + str(e)) else: self.logger.info("changed permissions to " + base_config_path + " to 777") @log_crash
[docs] def handle_message(self, mxmsg): message = cmsg.unpack_msg(mxmsg.type, mxmsg.message) msg, mtype, launcher_msg = self._call_handler(mxmsg.type, message) if msg is None: self.no_response() else: msg = cmsg.pack_msg(msg) if self.__to_all: self.send_message(message=msg, to=0, type=mtype, flush=True) self.__to_all = False else: self.send_message(message=msg, to=int(mxmsg.from_), type=mtype, flush=True) if launcher_msg is not None and self.launcher_sock is not None: self.logger.info('SENDING msg ' + launcher_msg.decode()[:100] + '[...]') send_msg(self.launcher_sock, launcher_msg) time.sleep(0.1) # TODO - temporary kind-of bug fix...
def _call_handler(self, mtype, message): if mtype == types.GET_CONFIG_PARAMS: return self.handle_get_config_params(message) elif mtype == types.REGISTER_PEER_CONFIG: return self.handle_register_peer_config(message) elif mtype == types.UNREGISTER_PEER_CONFIG: return self.handle_unregister_peer_config(message) elif mtype == types.UPDATE_PARAMS: msg, mtype, launcher_msg = self.handle_update_params(message) if mtype != types.CONFIG_ERROR: self.__to_all = True return msg, mtype, launcher_msg elif mtype == types.PEER_READY: return self.handle_peer_ready(message) elif mtype == types.PEERS_READY_QUERY: return self.handle_peers_ready_query(message) elif mtype == types.LAUNCHER_COMMAND: return self.handle_launcher_command(message) else: return None, None, None
[docs] def handle_get_config_params(self, message_obj): param_owner = message_obj.receiver names = message_obj.param_names if param_owner == 'config_server': params = dict(experiment_uuid=self.exp_uuid) # elif param_owner not in self._configs: # return cmsg.fill_msg(types.CONFIG_ERROR), types.CONFIG_ERROR, None else: # TODO error when param_name does not exist? # params = {} # for name in names: # if name in self._configs[param_owner]: # params[name] = self._configs[param_owner][name] params = self._get_params(param_owner, names) if isinstance(params, tuple): return params mtype = types.CONFIG_PARAMS msg = cmsg.fill_msg(mtype, sender=param_owner) cmsg.dict2params(params, msg) return msg, mtype, None
def _get_params(self, param_owner, names, params=None): if params is None: params = {} self.logger.info("looking for %s, param names=%s" % (param_owner, str(names))) if param_owner not in self._configs: self.logger.info("%s not in %s" % (param_owner, str(self._configs))) return cmsg.fill_msg(types.CONFIG_ERROR), types.CONFIG_ERROR, None for name in names: if name in self._configs[param_owner]: params[name] = self._configs[param_owner][name] elif name in self._ext_configs[param_owner]: owner, name = self._ext_configs[param_owner][name] params = self._get_params(owner, [name], params) return params
[docs] def handle_register_peer_config(self, message_obj): params = cmsg.params2dict(message_obj) ext_params = cmsg.params2dict(message_obj, field_name="ext_params") peer_id = message_obj.sender launcher_msg = None if peer_id in self._configs: mtype = types.CONFIG_ERROR msg = cmsg.fill_msg(mtype) else: self._configs[peer_id] = params self._ext_configs[peer_id] = ext_params mtype = types.PEER_REGISTERED msg = cmsg.fill_msg(mtype, peer_id=peer_id) launcher_msg = self.mtool.fill_msg('obci_peer_registered', peer_id=peer_id, params=params) self._save_config() return msg, mtype, launcher_msg
[docs] def handle_unregister_peer_config(self, message_obj): self._configs.pop(message_obj.peer_id) if message_obj.peer_id in self._ready_peers: self._ready_peers.remove(message_obj.peer_id) self._save_config() return None, None, None # TODO confirm unregister...
[docs] def handle_update_params(self, message_obj): params = cmsg.params2dict(message_obj) param_owner = message_obj.sender if param_owner not in self._configs: launcher_msg = None return cmsg.fill_msg(types.CONFIG_ERROR, error_str="Peer unknown: {0}".format(param_owner)),\ types.CONFIG_ERROR,\ launcher_msg updated = {} for param in params: if param in self._configs[param_owner]: self._configs[param_owner][param] = params[param] updated[param] = params[param] if updated: mtype = types.PARAMS_CHANGED msg = cmsg.fill_msg(types.PARAMS_CHANGED, sender=param_owner) cmsg.dict2params(updated, msg) launcher_msg = self.mtool.fill_msg('obci_peer_params_changed', peer_id=param_owner, params=updated) self._save_config() return msg, mtype, launcher_msg return None, None, None
[docs] def handle_peer_ready(self, message_obj): peer_id = message_obj.peer_id if peer_id not in self._configs: return cmsg.fill_msg(types.CONFIG_ERROR), types.CONFIG_ERROR, None self._ready_peers.append(peer_id) launcher_msg = self.mtool.fill_msg('obci_peer_ready', peer_id=peer_id) return message_obj, types.PEER_READY, launcher_msg
[docs] def handle_peers_ready_query(self, message_obj): peer_id = message_obj.sender if peer_id not in self._configs: return cmsg.fill_msg(types.CONFIG_ERROR), types.CONFIG_ERROR, None green_light = True for dep in message_obj.deps: if dep not in self._ready_peers: green_light = False return cmsg.fill_msg(types.READY_STATUS, receiver=peer_id, peers_ready=green_light), types.READY_STATUS, None
[docs] def handle_launcher_command(self, message_obj): return None, None, message_obj.serialized_msg
def _crash_extra_tags(self, exception=None): return {'obci_part': 'obci'}
if __name__ == "__main__": ConfigServer(settings.MULTIPLEXER_ADDRESSES).loop() # TODO make doctests from this """ srv = ConfigServer(settings.MULTIPLEXER_ADDRESSES) print "REGISTRATION" reg = cmsg.fill_msg(types.REGISTER_PEER_CONFIG, sender="ja", receiver="") cmsg.dict2params(dict(wr=1, dfg=[1,2,3,4,'zuzanna']), reg) srv.handle_register_peer_config(reg) reg = cmsg.fill_msg(types.REGISTER_PEER_CONFIG, sender="ty", receiver="") cmsg.dict2params(dict(a=1, bb=['ssdfsdf', 'LOL']), reg) srv.handle_register_peer_config(reg) reg = cmsg.fill_msg(types.REGISTER_PEER_CONFIG, sender="on", receiver="") cmsg.dict2params(dict(lll=0), reg) srv.handle_register_peer_config(reg) print srv._configs print "PEER_READY" rdy = cmsg.fill_msg(types.PEER_READY, peer_id="on") srv.handle_peer_ready(rdy) rdy = cmsg.fill_msg(types.PEER_READY, peer_id="ja") srv.handle_peer_ready(rdy) print srv._ready_peers print "PEERS_READY_QUERY" rdq = cmsg.fill_msg(types.PEERS_READY_QUERY, sender="ja", deps=["on, ty"]) print srv.handle_peers_ready_query(rdq)[0] rdq = cmsg.fill_msg(types.PEERS_READY_QUERY, sender="ty", deps=["on"]) print srv.handle_peers_ready_query(rdq)[0] print "GET_CONFIG_PARAMS" par = cmsg.fill_msg(types.GET_CONFIG_PARAMS, sender="ja", receiver="ty", param_names=['a','b']) print srv.handle_get_config_params(par)[0] par = cmsg.fill_msg(types.GET_CONFIG_PARAMS, sender="ja", receiver="ty", param_names=['bb']) rep = srv.handle_get_config_params(par)[0] print rep, "decoded params:\n", cmsg.params2dict(rep) print "DEREGISTRATION" unr = cmsg.fill_msg(types.UNREGISTER_PEER_CONFIG, peer_id="ja") srv.handle_unregister_peer_config(unr) print srv._configs """