#!/usr/bin/python
# -*- coding: utf-8 -*-
import time
import numbers
import codecs
import logging
from . import peer_config
from . import peer_config_parser
from obci.control.peer.peer_cmd import PeerCmd
from .config_defaults import CONFIG_DEFAULTS
import obci.control.common.config_message as cmsg
from obci.configs import settings
from obci.utils.openbci_logging import log_crash
from obci.mx_legacy.multiplexer_constants import peers, types
from obci.mx_legacy.clients import connect_client
from functools import reduce
CONFIG_FILE_EXT = 'ini'
WAIT_READY_SIGNAL = "wait_ready_signal"
CONFIG_FILE = "config_file"
PEER_ID = "peer_id"
BASE_CONF = "base_config_file"
LOGGER = logging.getLogger("peer_control_default_logger")
[docs]class PeerControl(object):
def __init__(self, peer=None, connection=None, param_validate_method=None,
param_change_method=None):
self.core = peer_config.PeerConfig()
self.peer = peer
self.peer_validate_params = param_validate_method
self.peer_params_changed = param_change_method
self.peer_id = None
self.base_config_path = None
self.connection = connection
self.query_conn = connect_client(type=peers.CONFIGURER,
addresses=settings.MULTIPLEXER_ADDRESSES)
self.cmd_overrides = {}
self.file_list = []
self.logger = LOGGER
# parse command line
self.process_command_line()
self._load_provided_configs()
if self.peer_validate_params:
if self.connection:
self.initialize_config(self.connection)
else:
self.initialize_config_locally()
[docs] def initialize_config(self, connection):
# self._load_provided_configs()
self._request_ext_params(connection)
# self.peer_validate_params(self.core.param_values)
return self.config_ready()
[docs] def initialize_config_locally(self):
# self._load_provided_configs()
# self.peer_validate_params(self.core.param_values)
return self.config_ready()
def _load_provided_configs(self):
# parse default config file
self._load_config_base()
self._load_defaults(CONFIG_DEFAULTS)
# parse external config file
self._load_config_external()
# parse other config files (names from command line)
for filename in self.file_list:
self._load_config_from_file(filename, CONFIG_FILE_EXT, update=True)
# parse overrides (from command line)
dictparser = peer_config_parser.parser('python')
dictparser.parse(self.cmd_overrides, self.core, update=True)
def _load_defaults(self, globals_):
for param, val in globals_.items():
self.core.add_local_param(param, val)
[docs] def process_command_line(self):
cmd_ovr, other_params = PeerCmd().parse_cmd()
self.peer_id = self.core.peer_id = other_params[PEER_ID]
self.base_config_path = other_params[BASE_CONF]
if other_params[CONFIG_FILE] is not None:
self.file_list = other_params[CONFIG_FILE]
self.cmd_overrides = cmd_ovr
def _load_config_external(self):
"""Parse the external configuration file, provided by peer"""
if self.peer is None:
raise NoPeerError
if self.peer.external_config_file is not None:
config_path = self.peer.external_config_file.rsplit('.', 1)[0]
config_path = '.'.join([config_path, CONFIG_FILE_EXT])
self._load_config_from_file(config_path, CONFIG_FILE_EXT)
def _load_config_base(self):
"""Parse the base configuration file, named the same as peer's
implementation file"""
if self.peer is None:
raise NoPeerError
# print "Peer {0} base config path: {1}".format(self.peer_id, base_config_path)
self._load_config_from_file(self.base_config_path, CONFIG_FILE_EXT)
# print "Peer {0} base config: {1}".format(self.peer_id, self.core)
def _load_config_from_file(self, p_path, p_type, update=False):
with codecs.open(p_path, "r", "utf8") as f:
parser = peer_config_parser.parser(p_type)
parser.parse(f, self.core)
[docs] def handle_config_message(self, mxmsg):
if mxmsg.type in cmsg.MX_CFG_MESSAGES:
message = cmsg.unpack_msg(mxmsg.type, mxmsg.message)
msg, mtype = self._call_handler(mxmsg.type, message)
if msg is None:
self.peer.no_response()
else:
msg = cmsg.pack_msg(msg)
self.peer.send_message(message=msg, type=mtype, to=int(mxmsg.from_), flush=True)
def _call_handler(self, mtype, message):
if mtype == types.PARAMS_CHANGED:
return self._handle_params_changed(message)
elif mtype == types.PEER_READY_SIGNAL:
return self._handle_peer_ready_signal(message)
elif mtype == types.SHUTDOWN_REQUEST:
self.peer.shut_down()
# return None, None
else:
return None, None
def _handle_params_changed(self, p_msg):
self.logger.info("PARAMS CHANGED - %s" % p_msg.sender)
params = cmsg.params2dict(p_msg)
param_owner = p_msg.sender
old_values = {}
updated = {}
if param_owner in self.core.config_sources:
src_params = self.core.params_for_source(param_owner)
for par_name in [par for par in params if par in src_params]:
old = self.core.get_param(par_name)
new = params[par_name]
if old != params[par_name]:
old_values[par_name] = old
updated[par_name] = new
self.core.set_param_from_source(p_msg.sender, par_name, new)
if not self.peer_params_changed(updated):
# restore...
for par, val in old_values.items():
self.core.set_param_from_source(p_msg.sender, par, val)
if param_owner == self.peer_id:
local_params = self.core.local_params
for par, val in params.items():
if par not in local_params:
# protest?
continue
if val != self.core.get_param(par):
old_values[par] = self.core.get_param(par)
updated[par] = val
self.core.update_local_param(par, val)
if not self.peer_params_changed(updated):
for par, val in old_values.items():
self.core.update_local_param(par, val)
return None, None
[docs] def config_ready(self):
rd, details = self.core.config_ready()
return rd and self.peer_id is not None, details
def _handle_peer_ready_signal(self, p_msg):
if not self.peer.ready_to_work and self.config_ready():
self.peer.ready_to_work = True
self.send_peer_ready(self.peer.conn)
return None, None
else:
return cmsg.fill_msg(types.CONFIG_ERROR), types.CONFIG_ERROR
@log_crash
[docs] def get_param(self, p_name):
return self.core.get_param(p_name)
@log_crash
[docs] def has_param(self, p_name):
return self.core.has_param(p_name)
@log_crash
[docs] def set_param(self, p_name, p_value):
result = self.core.update_local_param(p_name, p_value)
# TODO let know other peers...
if self.query_conn:
msg = cmsg.fill_msg(types.UPDATE_PARAMS, sender=self.peer_id)
params = {p_name: p_value}
cmsg.dict2params(params, msg)
self.__query(self.query_conn, msg,
types.UPDATE_PARAMS)
# self.connection.send_message(message=msg, type=types.UPDATE_PARAMS)
val_short = str(p_value)[:300] + '[...]'
self.logger.info(' param update:: %s %s', p_name, val_short)
else:
self.logger.warning('param updated locally %s, %s, %s',
p_name, val_short, str(result))
return result
[docs] def param_values(self):
return self.core.param_values
[docs] def true_val(self, value):
if isinstance(value, numbers.Number):
return value > 0
return str(value).lower() in ['1', 'true', 'yes', 't', 'y']
[docs] def register_config(self, connection):
if self.peer is None:
raise NoPeerError()
msg = cmsg.fill_msg(types.REGISTER_PEER_CONFIG, sender=self.peer_id)
params = self.core.local_params
cmsg.dict2params(params, msg)
ext_params = self.core.ext_param_defs
# register also external param definitions: param_name <---> (peer_id_of_config_source, param_name)
for par in ext_params:
ext_def = ext_params[par]
symname = ext_def[0]
ext_params[par] = (self.core.config_sources[symname], ext_def[1])
cmsg.dict2params(ext_params, msg, field_name="ext_params")
# connection.send_message(message=msg, type=types.REGISTER_PEER_CONFIG)
reply = self.__query(connection, cmsg.pack_msg(msg),
types.REGISTER_PEER_CONFIG)
# print 'AAAAAAAAAAAAAAAAAA', reply, "(rq:", types.REGISTER_PEER_CONFIG,\
# "exp:", types.PEER_REGISTERED, ')'
if reply is None:
self.logger.error('config registration unsuccesful!!!! %s',
str(reply))
elif not reply.type == types.PEER_REGISTERED:
self.logger.error('config registration unsuccesful!!!! %s',
str(reply))
def _request_ext_params(self, connection, retries=400):
# TODO set timeout and retry count
self.logger.info("requesting external parameters")
if self.peer is None:
raise NoPeerError
def _unset_param_count():
return reduce(lambda x, y: x + y,
[len(self.core.unset_params_for_source(src))
for src in self.core.used_config_sources()], 0)
ready, details = self.core.config_ready()
while not ready and retries:
for src in self.core.used_config_sources():
params = list(self.core.unset_params_for_source(src).keys())
msg = cmsg.fill_msg(types.GET_CONFIG_PARAMS,
sender=self.peer_id,
param_names=params,
receiver=self.core.config_sources[src])
# print "requesting: {0}".format(msg)
reply = self.__query(connection, cmsg.pack_msg(msg),
types.GET_CONFIG_PARAMS)
if reply is None:
# raise something?
continue
if reply.type == types.CONFIG_ERROR:
self.logger.warning("peer {0} has not yet started".format(msg.receiver))
elif reply.type == types.CONFIG_PARAMS:
reply_msg = cmsg.unpack_msg(reply.type, reply.message)
params = cmsg.params2dict(reply_msg)
for par, val in params.items():
self.core.set_param_from_source(reply_msg.sender, par, val)
else:
self.logger.error("WTF? {0}".format(reply.message))
# print '.',#"{0} external params still unset".format(_unset_param_count())
time.sleep(0.4)
ready, details = self.core.config_ready()
retries -= 1
if ready:
self.logger.info("External parameters initialised %s",
str(self.core.config_ready()))
return ready, details
[docs] def send_peer_ready(self, connection):
if self.peer is None:
raise NoPeerError
self.logger.info('sending ready signal.')
mtype = types.PEER_READY
msg = cmsg.fill_and_pack(mtype, peer_id=self.peer_id)
self.__query(connection, msg, mtype)
self._synchronize_ready(connection)
def _synchronize_ready(self, connection):
# TODO set timeout and retry count
if self.peer is None:
raise NoPeerError
others = list(self.core.launch_deps.values())
self.logger.info('waiting for other peers %s: ', str(others))
msg = cmsg.fill_and_pack(types.PEERS_READY_QUERY, sender=self.peer_id,
deps=others)
ready = False
while not ready:
reply = self.__query(connection, msg, types.PEERS_READY_QUERY)
# print 'got!', reply, cmsg.unpack_msg(reply.type, reply.message)
if reply is None:
# TODO sth bad happened, raise exception?
continue
if reply.type == types.READY_STATUS:
rmsg = cmsg.unpack_msg(reply.type, reply.message)
ready = rmsg.peers_ready
if not ready:
time.sleep(2)
self.logger.info("Dependencies are ready, I can start working")
def __query(self, conn, msg, msgtype):
try:
reply = conn.query(message=msg,
type=msgtype)
except Exception:
self.logger.exception("Query failed")
reply = None
return reply
# todo message verification
[docs]class PeerConfigControlError(Exception):
def __init__(self, value=None):
self.value = value
def __str__(self):
if self.value is not None:
return repr(self.value)
else:
return repr(self)
[docs]class ConfigNotReadyError(PeerConfigControlError):
pass
[docs]class NoPeerError(PeerConfigControlError):
pass
[docs]class PeerConfigControlWarning(Warning):
pass
[docs]class UnsupportedMessageType(PeerConfigControlWarning):
pass