Source code for obci.control.peer.peer_config

#!/usr/bin/python
# -*- coding: utf-8 -*-

import warnings

from obci.control.common.config_helpers import (param_name_type_check,
                                                module_id_type_check,
                                                argument_not_empty_check,
                                                reference_split)


[docs]class PeerConfig(object): """ This class represents core configuration of an OpenBCI peer. It is meant to be used both as an output of a config parser and as a configuration holder for the running peer (wrapped in a controller with networking capabilities). Peer configuration consists of local parameters, external parameters, config sources definitions and launch dependencies. Local parameters: param_name : value are owned by the peer itself, whereas external parameters have to be obtained from other peers. External parameter definition: local_parameter_name : config_source_name.parameter_name Config source names are symbolic names for dependencies. In run time real peer ID's are assingned to those names; parameter_name is the remote parameter that is requested from source. Launch dependencies are similar to config sources: the definitions are identical and symbolic name pool is common with config source names i.e. all names are global in the configuration file. Launch dependencies are used when we want to delay peer start until all dependncies report that they are ready to work. """ def __init__(self, peer_id=None, warn_overwrite=False): # system ID of the peer # TODO - is this necessary in config core? self.peer_id = peer_id # keys are source names, values are real peer IDs self._config_sources = {} # keys are symbolic names, values are peer IDs self._launch_deps = {} # the other way: keys are peer IDs, values are lists of source names # to which the ID is assigned self._src_ids = {} # the other way: keys are peer IDs, values are lists of launch # dependencies to which the ID is assigned self._dep_ids = {} # external parameter definitions: keys are local param names, values # are source names and remote parameter names self._ext_param_defs = {} # already acquired external parameters self._set_ext_params = [] # all parameter values, for not-yet-obtained external params # None is stored self._param_values = {} self._warn_overwrite = warn_overwrite def __repr__(self): st = "PEER CONFIG [{0}]".format(self.peer_id) st = ''.join([st, "\nPeer ID: {0}".format(self.peer_id)]) st = ''.join([st, "\nConfig_sources: {0}".format( self._config_sources)]) st = ''.join([st, "\nLaunch dependencies: {0}".format( self._launch_deps)]) st = ''.join([st, "\nExternal param definitions: {0}".format( self._ext_param_defs)]) st = ''.join([st, "\nParameter values: {0}".format( self._param_values)]) st = ''.join([st, "\nAcquired external params: {0}".format( self._set_ext_params)]) return st @property def config_sources(self): """ Configuration sources' symbolic names and their assigned real peer IDs. (A dictionary) """ return self._config_sources @property def launch_deps(self): """ Launch dependencies' symbolic names and their assigned real peer IDs. (A dictionary) """ return self._launch_deps @property def source_ids(self): """ Peer ID's of the config sources and lists of their symbolic names. (A dictionary) """ return self._src_ids @property def dep_ids(self): """ Peer ID's of the launch dependencies and lists of their symbolic names. (A dictionary) """ return self._src_ids @property def param_values(self): """ A dictionary of all parameter values. """ return self._param_values @property def ext_param_defs(self): """ External parameter definitions. A definition consists of a local parameter name and a reference: pair of symbolic source name and remote parameter name. """ return self._ext_param_defs @property def ready_ext_params(self): """ External parameters with values succesfully obtained from sources. """ return self._set_ext_params @property def local_params(self): """ Local parameters and values (dict) """ params = {} for name in self._param_values: if name not in self._ext_param_defs: params[name] = self._param_values[name] return params
[docs] def assign_id_to_name(self, sym_name, peer_id): if sym_name in self._config_sources: self.set_config_source(sym_name, peer_id) elif sym_name in self._launch_deps: self.set_launch_dependency(sym_name, peer_id)
[docs] def set_config_source(self, source_name, peer_id='', _set_dep=True): """ Store a configuration symbolic source name and its real peer_id. Default ID is empty and can be updated later. Warns on overwrite. """ param_name_type_check(source_name) module_id_type_check(peer_id) argument_not_empty_check(source_name) srcs = self._config_sources if source_name in srcs: old_id = srcs[source_name] self._overwrite_warn("""Config source overwrite! \ Name: {0}, old id: {1}, new id: {2}""".format( source_name, old_id, peer_id)) self._src_ids[old_id].remove(source_name) self._config_sources[source_name] = peer_id if peer_id not in self._src_ids: self._src_ids[peer_id] = [] self._src_ids[peer_id].append(source_name) if source_name in self._launch_deps and _set_dep: self.set_launch_dependency(source_name, peer_id, _set_src=False)
[docs] def set_launch_dependency(self, dep_name, peer_id='', _set_src=True): """ """ param_name_type_check(dep_name) module_id_type_check(peer_id) argument_not_empty_check(dep_name) deps = self._launch_deps if dep_name in deps: old_id = deps[dep_name] self._overwrite_warn("""Dependency overwrite! \ Name: {0}, old id: {1}, new id: {2}""".format( dep_name, old_id, peer_id)) self._dep_ids[old_id].remove(dep_name) self._launch_deps[dep_name] = peer_id if peer_id not in self._dep_ids: self._dep_ids[peer_id] = [] self._dep_ids[peer_id].append(dep_name) if dep_name in self._config_sources and _set_src: self.set_config_source(dep_name, peer_id, _set_dep=False)
[docs] def get_param(self, param_name): """ Return value of a parameter param_name. Parameter may be local or external. """ return self._param_values[param_name]
[docs] def has_param(self, param_name): return param_name in self._param_values
[docs] def add_external_param_def(self, param_name, reference): """ Store an external parameter definition. param_name is the local name of the parameter, reference is a string 'source_name.remote_param_name'. Warns about overwriting definitions, especially when a previously local parameter becomes external. """ # TODO - API change, split reference to 2 method params. param_name_type_check(param_name) argument_not_empty_check(param_name) df = [source_name, source_param] = reference_split(reference) if source_name not in self._config_sources: raise ValueError("Source name {0} from reference ('{1}') \ not declared in configuration!".format(source_name, reference)) defs = self._ext_param_defs if param_name in defs: self._overwrite_warn(msg_overwrite_ext_def.format( param_name, defs[param_name], df)) elif param_name in self._param_values: self._overwrite_warn(msg_overwrite_local_to_ext.format( param_name, source_name, source_param)) self._ext_param_defs[param_name] = (source_name, source_param) self._param_values[param_name] = None
[docs] def update_external_param_def(self, param_name, reference): """ Update external parameter definition. Works as add_external_param_def() but the definition should already be stored in PeerConfig. Otherwise raises ValueError. """ self._update_check(param_name) self.add_external_param_def(param_name, reference)
def _set_external_param(self, param_name, value): # Store a value for external parameter if param_name not in self._ext_param_defs: raise ValueError("Parameter {0} not defined as \ external parameter".format(param_name)) else: self._param_values[param_name] = value self._set_ext_params.append(param_name)
[docs] def set_param_from_source(self, src_id, src_param, value): """ Store a value of a remote parameter. src_id - peer ID of the source src_param - remote parameter name value - parameter value """ src_names = self._src_ids[src_id] for src_name in src_names: src_params = self.params_for_source(src_name) if src_param in src_params: self._set_external_param(src_params[src_param], value)
[docs] def add_local_param(self, param_name, value): """ Store a value of a local parameter. Warn when a parameter is overwritten, especially when a previously external parameter becomes local. """ param_name_type_check(param_name) argument_not_empty_check(param_name) if param_name in self._ext_param_defs: self._overwrite_warn(msg_overwrite_ext_to_local.format( param_name, value)) del self._ext_param_defs[param_name] if param_name in self._set_ext_params: del self._set_ext_params[param_name] elif param_name in self._param_values: self._overwrite_warn(msg_overwrite_local.format( param_name, self._param_values[param_name], value)) self._param_values[param_name] = value
[docs] def update_local_param(self, param_name, value): """ Update local parameter. Works like add_local_param() but the parameter should already exist in PeerConfig. Otherwise raise ValueError. """ self._update_check(param_name) return self.add_local_param(param_name, value)
[docs] def set_param(self, param_name, value): return self.update_local_param(param_name, value)
# Helper methods
[docs] def config_ready(self): """ Return True if the configuration is usable, i.e. all used config sources have peer ID's assigned and all external parameter values are already obtained. """ details = {} rd = self.config_sources_ready(details) rd = self.external_params_ready(details) and rd rd = self.launch_deps_ready(details) and rd return rd, details
[docs] def config_sources_ready(self, details=None): """ Return True if all used config sources have peer ID's assigned. """ result = True unused = self.unused_config_sources() if details is not None: details["config_sources"] = [] for src, peer_id in self._config_sources.items(): if not peer_id and src not in unused: if details is not None: details["config_sources"].append((src, peer_id)) result = False return result
[docs] def launch_deps_ready(self, details=None): """ Return True if all launch deps have peer ID's assigned. """ result = True if details is not None: details["launch_deps"] = [] for dep, peer_id in self._launch_deps.items(): if not peer_id: if details is not None: details["launch_deps"].append((dep, peer_id)) result = False return result
[docs] def external_params_ready(self, details=None): """ Return True if the values of all external parameters have been successfully obtained. """ result = True if details is not None: details["ext_params"] = [] for name in self._ext_param_defs.keys(): if name not in self._set_ext_params: if details is not None: details["ext_params"].append((name, self._ext_param_defs[name])) result = False return result
[docs] def params_for_source(self, p_src): """ Return a list of parameters coming from this source (symbolic name). """ params = {} for loc_name, (src, src_par) in self._ext_param_defs.items(): if src == p_src: params[src_par] = loc_name return params
[docs] def unset_params_for_source(self, p_src): """ Return a list of parameters coming from this source (symbolic name) which are not yet set. """ params = {} for loc_name, (src, src_par) in self._ext_param_defs.items(): if src == p_src and loc_name not in self._set_ext_params: params[src_par] = loc_name return params
[docs] def used_config_sources(self): """ Return a list of symbolic source names which are referenced in external parameter definitions. """ used = [reference[0] for reference in self._ext_param_defs.values()] return list(set(used))
[docs] def unused_config_sources(self): """ Return a list of symbolic source names which are NOT referenced in external parameter definitions. """ used = self.used_config_sources() unused = [src for src in self._config_sources.keys() if src not in used] return unused
[docs] def unassigned_config_sources(self): """ Return a list of configuration source names which do not have real peer ID assigned. """ ss = self._config_sources return [src for src in ss.keys() if ss[src] == '']
[docs] def unassigned_launch_deps(self): """ Return a list of launch dependency names which do not have real peer ID assigned. """ deps = self._launch_deps return [dep for dep in deps.keys() if deps[dep] == '']
def _update_check(self, param_name): if param_name not in self._param_values: raise ValueError("Parameter {0} does not exist in configuration of {1},\ cannot update!".format(param_name, self.peer_id)) def _overwrite_warn(self, p_message): if self._warn_overwrite: warnings.warn(ConfigOverwriteWarning(p_message), stacklevel=2)
msg_overwrite_ext_to_local = "Changing external parameter '{0}' to Local! \ New value: {1}." msg_overwrite_local_to_ext = "Changing local parameter {0} to external! \ Source: {1}, src_param: {2}." msg_overwrite_local = "Overwriting local param '{0}'! Old value: {1}, \ new value: {2}" msg_overwrite_ext_def = "External parameter definition overwrite! \ Name: {0}, old ref: {1}, new ref: {2}." # warnings, exceptions
[docs]class ConfigWarning(Warning): def __init__(self, value=None): self.value = value def __str__(self): if self.value is not None: return repr(self.value) else: return repr(self)
[docs]class ConfigOverwriteWarning(ConfigWarning): pass