#!/usr/bin/python
# -*- coding: utf-8 -*-
import codecs
import logging
from obci.control.common.config_helpers import (CONFIG_SOURCES,
LAUNCH_DEPENDENCIES,
LOCAL_PARAMS,
EXT_PARAMS)
import obci.control.launcher.launcher_tools as launcher_tools
from obci.control.peer.peer_config_serializer import PeerConfigSerializerCmd
from obci.control import peer
from obci.control.peer.peer_config import PeerConfig
from obci.control.common.graph import Graph, Vertex
[docs]class OBCIExperimentConfig(object):
def __init__(self, launch_file_path=None, uuid=None,
origin_machine=None, logger=None):
self.uuid = uuid
self.launch_file_path = launch_file_path
self.origin_machine = origin_machine if origin_machine else ''
self.scenario_dir = ''
self.mx = 0
self.peers = {}
self.logger = logger or logging.getLogger("ObciExperimentConfig")
@property
def launch_file_path(self):
return self._launch_file_path
@launch_file_path.setter
def launch_file_path(self, path):
self._launch_file_path = path
if path:
self._launch_file_path = launcher_tools.obci_root_relative(path)
[docs] def peer_config(self, peer_id):
return self.peers[peer_id].config
[docs] def update_local_param(self, peer_id, p_name, p_value):
if peer_id not in self.peers:
raise OBCISystemConfigError("Peer ID {0} not in peer list".format(peer_id))
return self.peers[peer_id].config.update_local_param(p_name, p_value)
[docs] def update_external_param(self, peer_id, p_name, src, src_param):
if peer_id not in self.peers:
raise OBCISystemConfigError("Peer ID {0} not in peer list".format(peer_id))
return self.peers[peer_id].config.update_external_param_def(p_name, src + '.' + src_param)
[docs] def update_peer_config(self, peer_id, config_dict):
if peer_id not in self.peers:
raise OBCISystemConfigError("Peer ID {0} not in peer list".format(peer_id))
conf = self.peers[peer_id].config
dictparser = peer.peer_config_parser.parser('python')
return dictparser.parse(config_dict, conf, update=True)
[docs] def file_update_peer_config(self, peer_id, file_path):
if peer_id not in self.peers:
raise OBCISystemConfigError("Peer ID {0} not in peer list".format(peer_id))
parser = peer.peer_config_parser.parser('ini')
with open(file_path) as f:
return parser.parse(f, self.peers[peer_id].config, update=True)
[docs] def peer_path(self, peer_id):
return self.peers[peer_id].path
[docs] def peer_machine(self, peer_id):
return self.peers[peer_id].machine
[docs] def update_peer_machine(self, peer_id, machine_ip):
self.peers[peer_id].machine = machine_ip
[docs] def extend_with_peer(self, peer_id, peer_path, peer_cfg,
config_sources=None, launch_deps=None,
param_overwrites=None, machine=None):
override = peer_id in self.peers
machine = machine or ""
self.add_peer(peer_id)
self.set_peer_config(peer_id, peer_cfg)
self.set_peer_path(peer_id, peer_path)
self.set_peer_machine(peer_id, machine)
if config_sources:
for src_name, src_id in config_sources.items():
self.set_config_source(peer_id, src_name, src_id)
else:
for src in peer_cfg.config_sources:
if src in self.peers:
self.set_config_source(peer_id, src, src)
if launch_deps:
for dep_name, dep_id in launch_deps.items():
self.set_launch_dependency(peer_id, dep_name, dep_id)
else:
for dep in peer_cfg.launch_deps:
if dep in self.peers:
self.set_launch_dependency(peer_id, dep, dep)
if param_overwrites:
for par, val in param_overwrites.items():
self.update_local_param(peer_id, par, val)
return override
[docs] def add_peer(self, peer_id):
self.peers[peer_id] = PeerConfigDescription(peer_id, self.uuid)
[docs] def set_peer_config(self, peer_id, peer_config):
self.peers[peer_id].config = peer_config
[docs] def set_peer_path(self, peer_id, path):
self.peers[peer_id].path = path
[docs] def set_config_source(self, peer_id, src_name, src_peer_id):
if src_peer_id and src_peer_id not in self.peers:
raise OBCISystemConfigError("(src) Peer ID {0} not in peer list".format(src_peer_id))
if peer_id not in self.peers:
raise OBCISystemConfigError("Peer ID {0} not in peer list".format(peer_id))
if self.peers[peer_id] is None:
raise OBCISystemConfigError("Configuration for peer ID {0} does not exist".format(peer_id))
self.peers[peer_id].config.set_config_source(src_name, src_peer_id)
[docs] def set_launch_dependency(self, peer_id, dep_name, dep_peer_id):
if dep_peer_id not in self.peers:
raise OBCISystemConfigError("(dep) Peer ID {0} not in peer list".format(dep_peer_id))
if peer_id not in self.peers:
raise OBCISystemConfigError("Peer ID {0} not in peer list".format(peer_id))
if self.peers[peer_id] is None:
raise OBCISystemConfigError("Configuration for peer ID {0} does not exist".format(peer_id))
self.peers[peer_id].config.set_launch_dependency(dep_name, dep_peer_id)
[docs] def set_peer_machine(self, peer_id, machine_name):
if peer_id not in self.peers:
raise OBCISystemConfigError("Peer ID {0} not in peer list".format(peer_id))
self.peers[peer_id].machine = machine_name
[docs] def all_param_values(self, peer_id):
if peer_id not in self.peers:
raise OBCISystemConfigError("Peer ID {0} not in peer list".format(peer_id))
config = self.peers[peer_id].config
not_fresh = config.param_values
vals = {}
for key in not_fresh:
vals[key] = self._param_value(peer_id, key, config)
return vals
[docs] def local_params(self, peer_id):
return self.peers[peer_id].config.local_params
[docs] def param_value(self, peer_id, param_name):
if peer_id not in self.peers:
raise OBCISystemConfigError("Peer ID {0} not in peer list".format(peer_id))
config = self.peers[peer_id].config
return self._param_value(peer_id, param_name, config)
def _param_value(self, peer_id, param_name, config):
if param_name in config.local_params:
return config.local_params[param_name]
elif param_name in config.ext_param_defs:
peer, param = config.ext_param_defs[param_name]
source = config.config_sources[peer]
return self.param_value(source, param)
else:
raise OBCISystemConfigError("Param {0} does not exist in {1}".format(param_name, peer_id))
[docs] def config_ready(self):
details = {}
if not self.peers:
return False, details
for peer_state in self.peers.values():
if not peer_state.ready(details):
return False, details
res, det = self.launch_deps_graph_ok()
if not res:
return res, det
res, det = self.config_sources_graph_ok()
if not res:
return res, det
return True, {}
[docs] def launch_deps_graph_ok(self):
gr = self.peer_graph('list_launch_deps')
res, order = gr.topo_sort()
details = '' if res else {'desc': "Launch dependencies graph contains a cycle!!!"}
return res, details
[docs] def config_sources_graph_ok(self):
gr = self.peer_graph('list_config_sources')
res, order = gr.topo_sort()
details = '' if res else {'desc': "Config sources graph contains a cycle!!!"}
return res, details
[docs] def status(self, status_obj):
ready, details = self.config_ready()
st = launcher_tools.READY_TO_LAUNCH if ready else launcher_tools.NOT_READY
status_obj.set_status(st, details=details)
# TODO details, e.g. info about cycles
for peer_id in self.peers:
pst = status_obj.peers_status[peer_id] = launcher_tools.PeerStatus(peer_id)
self.peers[peer_id].status(pst)
[docs] def peer_machines(self):
machines = set([self.origin_machine])
for peer in self.peers.values():
if peer.machine:
machines.add(peer.machine)
return list(machines)
[docs] def launch_data(self, peer_machine):
ldata = {}
for peer in self.peers.values():
machine = peer.machine if peer.machine else self.origin_machine
if machine == peer_machine:
ldata[peer.peer_id] = peer.launch_data()
return ldata
[docs] def peer_graph(self, neighbours_method):
gr = Graph(bidirectional=False)
vs = {}
for p in self.peers.values():
# print "vertices: ", vs
meth = getattr(p, neighbours_method)
ngs = meth()
# print "$$$$$$$$$$$$$$$$$$", p.peer_id, ngs, neighbours_method
# print p.config
if p not in vs:
# print "create vertex for ", p.peer_id,
ver_p = Vertex(gr, p)
vs[p] = ver_p
gr.add_vertex(ver_p)
# print "vvvv: ", gr.vertices()
for ne in ngs:
pr = self.peers[ne]
if pr not in vs:
# print "create second vertex for", pr.peer_id,
ver_ng = Vertex(gr, pr)
vs[pr] = ver_ng
gr.add_vertex(ver_ng)
# print "vvvvvv2 ", gr.vertices()
gr.add_edge(vs[p], vs[pr])
# print "vvvvv after edge: ", ver_p, ver_ng, "::::", gr.vertices()
return gr
def _topo_sort(self, neighbours_method):
gr = self.peer_graph(neighbours_method)
res, order = gr.topo_sort()
ret_order = []
for part in order:
ret_order.append([v._model.peer_id for v in part])
return ret_order
[docs] def peer_order(self):
order = self._topo_sort('list_launch_deps')
if order:
part1 = order[0]
if 'mx' in part1:
part1.remove('mx')
if 'config_server' in part1:
part1.remove('config_server')
order = [['mx'], ['config_server']] + order
return order
[docs] def peers_info(self):
peers = {}
for p in self.peers:
peers[p] = self.peers[p].info()
return peers
[docs] def info(self):
exp = {}
exp["uuid"] = self.uuid
exp["origin_machine"] = self.origin_machine
exp["launch_file_path"] = self.launch_file_path
peers = self.peers_info()
exp["peers"] = peers
return exp
[docs]class PeerConfigDescription(object):
def __init__(self, peer_id, experiment_id, config=None, path=None, machine=None,
logger=None):
self.peer_id = peer_id
self.experiment_id = experiment_id
self.config = PeerConfig(peer_id)
self.path = path
self.machine = machine
self.public_params = []
self.logger = logger or logging.getLogger("ObciExperimentConfig.peer_id")
self.del_after_stop = False
def __str__(self):
return self.peer_id
[docs] def ready(self, details=None):
loc_det = {}
ready = self.config is not None and \
self.path is not None and\
self.machine is not None and\
self.peer_id is not None
if not ready:
return ready
ready = self.config.config_sources_ready(loc_det) and ready
ready = self.config.launch_deps_ready(loc_det) and ready
if details is not None:
details[self.peer_id] = loc_det
return ready
[docs] def list_config_sources(self):
return [val for val in self.config.config_sources.values() if
val in self.config.used_config_sources()]
[docs] def list_launch_deps(self):
return list(self.config.launch_deps.values())
[docs] def status(self, peer_status_obj):
det = {}
ready = self.ready(det)
st = launcher_tools.READY_TO_LAUNCH if ready else launcher_tools.NOT_READY
peer_status_obj.set_status(st, details=det)
[docs] def peer_type(self):
if self.peer_id.startswith('mx'):
return 'multiplexer'
else:
return 'obci_peer'
[docs] def launch_data(self):
ser = PeerConfigSerializerCmd()
args = [self.peer_id]
peer_parser = peer.peer_config_parser.parser("ini")
base_config = PeerConfig(self.peer_id)
conf_path = launcher_tools.default_config_path(self.path)
if conf_path:
with codecs.open(conf_path, "r", "utf8") as f:
self.logger.info("parsing default config for peer %s, %s ",
self.peer_id, conf_path)
peer_parser.parse(f, base_config)
ser.serialize_diff(base_config, self.config, args)
return dict(peer_id=self.peer_id, experiment_id=self.experiment_id,
path=self.path, machine=self.machine,
args=args, peer_type=self.peer_type())
[docs] def info(self, detailed=False):
info = dict(peer_id=self.peer_id,
path=self.path, machine=self.machine, peer_type=self.peer_type()
)
if not self.config:
return info
info[CONFIG_SOURCES] = self.config.config_sources
info[LAUNCH_DEPENDENCIES] = self.config.launch_deps
if detailed:
info[LOCAL_PARAMS] = self.config.local_params
info[EXT_PARAMS] = self.config.ext_param_defs
return info
[docs]class OBCISystemConfigError(Exception):
pass
[docs]class OBCISystemConfigWarning(Warning):
pass