#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import io
import time
import socket
import zmq
from PyQt4 import QtCore
from obci.control.common.message import send_msg, OBCIMessageTool, PollingObject
from obci.acquisition import acquisition_helper
from obci.control.launcher.launcher_messages import message_templates
import obci.control.launcher.system_config as system_config
import obci.control.launcher.launch_file_parser as launch_file_parser
import obci.control.launcher.launcher_tools as launcher_tools
import obci.control.peer.peer_cmd as peer_cmd
import obci.control.common.net_tools as net
import obci.control.common.obci_control_settings as settings
MODE_BASIC = 'basic'
MODE_ADVANCED = 'advanced'
MODE_EXPERT = 'expert'
MODES = [MODE_ADVANCED, MODE_BASIC] # , MODE_EXPERT]
DEFAULT_CATEGORY = 'Uncategorised'
USER_CATEGORY = 'User defined'
SIGNAL_STORAGE_PEERS = {
"signal_saver": "acquisition/signal_saver_peer.py",
"tag_saver": "acquisition/tag_saver_peer.py",
"info_saver": "acquisition/info_saver_peer.py"
}
[docs]class ExperimentEngineInfo(QtCore.QObject):
exp_saver_msg = QtCore.pyqtSignal(object)
def __init__(self, preset_data=None, launcher_data=None, ctx=None):
self.preset_data = preset_data
self.launch_file = None
self.launcher_data = launcher_data
self.name = None
self.info = ""
self.public_params = []
self.origin_machine = ''
self.unsupervised_peers = {}
self.old_uid = None
self.ctx = ctx
self.exp_req = None
self.mtool = OBCIMessageTool(message_templates)
self.poller = PollingObject()
self.category = DEFAULT_CATEGORY
if preset_data is not None:
self.setup_from_preset(preset_data)
elif launcher_data is not None:
self.setup_from_launcher(launcher_data)
super(ExperimentEngineInfo, self).__init__()
[docs] def cleanup(self):
if self.exp_req:
self.exp_req.close() # linger=0)
[docs] def setup_from_preset(self, preset_data, launcher=False):
self.preset_data = preset_data
self.overwrites = {}
self.runtime_changes = {}
self.status = launcher_tools.ExperimentStatus()
self.exp_config = system_config.OBCIExperimentConfig()
self.name = preset_data['name']
self.launch_file = preset_data['launch_file']
self.info = preset_data['info']
self.public_params = [p.strip() for p in preset_data['public_params'].split(',')]
self.exp_config.uuid = self.name + '--' + self.launch_file
self.category = preset_data['category']
result, details = self._make_config()
self.status.details = details
self._set_public_params()
def _addr_connectable(self, addr, machine):
return machine == socket.gethostname() or \
(net.is_ip(addr) and not net.addr_is_local(addr))
[docs] def setup_from_launcher(self, launcher_data, preset=False, transform=False):
self.launcher_data = launcher_data
self.runtime_changes = {}
if preset:
self.old_uid = self.exp_config.uuid
if not preset or transform:
self.overwrites = {}
self.status = launcher_tools.ExperimentStatus()
self.exp_config = system_config.OBCIExperimentConfig()
self.name = launcher_data['name'] # if not preset else self.old_uid
if not preset:
self.launch_file = launcher_data['launch_file_path']
connected = False
if not transform:
self.ctx = self.ctx if self.ctx is not None else zmq.Context()
self.exp_req = self.ctx.socket(zmq.REQ)
machine = launcher_data['origin_machine']
for addr in launcher_data['rep_addrs']:
if self._addr_connectable(addr, machine):
try:
self.exp_req.connect(addr)
except Exception as e:
print(addr, False)
else:
connected = True
if not connected:
print("Connection to experiment ", self.name, "UNSUCCESFUL!!!!!!")
return
self.exp_config.uuid = launcher_data['uuid']
self.exp_config.origin_machine = launcher_data['origin_machine']
self.uuid = self.exp_config.uuid
self.exp_config.launch_file_path = self.launch_file
result, details = self._get_experiment_scenario()
self.exp_config.status(self.status)
self.status.set_status(launcher_data['status_name'], details=launcher_data['details'])
self._get_experiment_details()
self._set_public_params()
[docs] def update_scenario(self, launch_file_path, scenario):
self.exp_config.launch_file_path = launch_file_path
self._process_experiment_scenario(scenario)
self.exp_config.status(self.status)
def _set_public_params(self):
for par in self.public_params:
if len(par.split('.')) == 2:
[peer, param] = par.split('.')
self.exp_config.peers[peer].public_params.append(param)
def _make_config(self):
self.exp_config.launch_file_path = self.launch_file
self.uuid = self.exp_config.uuid
result, details = self.make_experiment_config()
self.exp_config.status(self.status)
return result, details
# FIXME !!! copy-paste from obci_experiment
[docs] def make_experiment_config(self):
launch_parser = launch_file_parser.LaunchFileParser(
launcher_tools.obci_root(), settings.DEFAULT_SCENARIO_DIR)
if not self.launch_file:
return False, "Empty scenario."
try:
with open(launcher_tools.expand_path(self.launch_file)) as f:
launch_parser.parse(f, self.exp_config, apply_globals=True)
except Exception as e:
self.status.set_status(launcher_tools.NOT_READY, details=str(e))
print("config errror ", str(e))
return False, str(e)
rd, details = self.exp_config.config_ready()
if rd:
self.status.set_status(launcher_tools.READY_TO_LAUNCH)
else:
self.status.set_status(launcher_tools.NOT_READY, details=details)
print(rd, details)
return True, None
def _get_experiment_scenario(self):
if not self.exp_req:
return False, "No experiment socket"
response = self.comm_exp(self.mtool.fill_msg("get_experiment_scenario"))
if not response:
return False, "No response from experient"
print("GOT SCENARIO", response.scenario)
return self._process_experiment_scenario(response.scenario)
def _process_experiment_scenario(self, json_scenario):
jsonpar = launch_file_parser.LaunchJSONParser(
launcher_tools.obci_root(), settings.DEFAULT_SCENARIO_DIR)
inbuf = io.StringIO(json_scenario)
jsonpar.parse(inbuf, self.exp_config)
print("MY PEEEEERS:", self.exp_config.peers.keys())
rd, details = self.exp_config.config_ready()
if rd:
self.status.set_status(launcher_tools.READY_TO_LAUNCH)
else:
self.status.set_status(launcher_tools.NOT_READY, details=details)
print(rd, details)
return True, None
def _get_experiment_details(self):
if not self.exp_req:
return
exp_msg = self.comm_exp(self.mtool.fill_msg("get_experiment_info"))
if not exp_msg:
return
self.origin_machine = exp_msg.origin_machine
for peer, short_info in exp_msg.peers.items():
# self.exp_config.set_peer_machine(peer, short_info['machine'])
msg = self.comm_exp(self.mtool.fill_msg("get_peer_info",
peer_id=peer))
if not msg:
return
ext_defs = {}
for name, defi in msg.external_params.items():
ext_defs[name] = defi[0] + '.' + defi[1]
self.exp_config.update_peer_config(peer, dict(config_sources=msg.config_sources,
launch_dependencies=msg.launch_dependencies,
local_params=msg.local_params,
external_params=ext_defs))
for peer, status in exp_msg.experiment_status['peers_status'].items():
self.status.peer_status(peer).set_status(
status['status_name'],
details=status['details'])
[docs] def parameters(self, peer_id, mode):
params = {}
peer = self.exp_config.peers[peer_id]
if mode == MODE_BASIC:
for par in peer.public_params:
params[par] = (self.exp_config.param_value(peer_id, par), None)
else:
params = peer.config.local_params
for param in peer.config.local_params:
params[param] = (self.exp_config.param_value(peer_id, param), None)
for param, defi in peer.config.ext_param_defs.items():
source_symbol = defi[0]
source = peer.config.config_sources[source_symbol]
params[param] = (self.exp_config.param_value(peer_id, param), source + '.' + defi[1])
return params
[docs] def comm_exp(self, msg):
send_msg(self.exp_req, msg)
response, _ = self.poller.poll_recv(self.exp_req, timeout=3000)
if not response:
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!1 no response to ", msg)
self.exp_req.close()
self.exp_req = self.ctx.socket(zmq.REQ)
for addr in self.launcher_data['rep_addrs']:
if self._addr_connectable(addr, self.launcher_data['origin_machine']):
self.exp_req.connect(addr)
return None
return self.mtool.unpack_msg(response)
[docs] def updatable(self, peer_id, config_part, **kwargs):
return False
[docs] def update_peer_param(self, peer_id, param, value, runtime=False):
changes = self.overwrites if not runtime else self.runtime_changes
ovr = changes.get(peer_id, None)
ovr = ovr if ovr is not None else {}
if param not in ovr:
old = self.exp_config.param_value(peer_id, param)
if old != value:
ovr[param] = old
changes[peer_id] = ovr
self.exp_config.update_local_param(peer_id, param, value)
[docs] def get_launch_args(self):
d = dict(launch_file=self.launch_file, name=self.name)
args = ['--ovr']
if self.overwrites:
for peer_id in self.overwrites:
args.append('--peer')
args.append(peer_id)
for arg in self.overwrites[peer_id]:
args += ['-p', arg, self.exp_config.param_value(peer_id, arg)]
pack = peer_cmd.peer_overwrites_pack(args)
d['overwrites'] = pack
print("overwrites pack!!!!!!!!!!!!!!!!!!!!! ", pack)
return d
[docs] def peer_info(self, peer_id):
return self.exp_config.peers[peer_id]
[docs] def add_peer(self, peer_id, peer_path, config_sources=None, launch_deps=None,
custom_config_path=None, param_overwrites=None, machine=None):
return launch_file_parser.extend_experiment_config(
self.exp_config,
peer_id,
peer_path,
config_sources,
launch_deps,
custom_config_path,
param_overwrites,
machine,
apply_globals=True)
[docs] def enable_signal_storing(self, store_options):
if not store_options:
return
if int(store_options['append_timestamp']):
store_options = dict(store_options)
store_options['save_file_name'] = store_options['save_file_name'] + "_" + str(time.time())
for peer, peer_path in SIGNAL_STORAGE_PEERS.items():
if peer not in self.exp_config.peers:
self.add_peer(peer, peer_path)
saver = self.exp_config.peers['signal_saver']
params = saver.config.param_values
for opt, val in store_options.items():
if opt in saver.config.param_values:
params[opt] = val
[docs] def stop_storing(self, client):
join_response = client.join_experiment(self.uuid, "dummy_module_" + str(time.time()), "")
if join_response is None:
print("experiment engine info - ERROR - connection timeout on stop signal storing!")
return
if not join_response.type == "rq_ok":
print("experiment engine info - ERROR - join error on stop signal storing!")
return
mx_addr = join_response.params["mx_addr"].split(':')
# hang and wait ...
acquisition_helper.finish_saving([(mx_addr[0], int(mx_addr[1]))])