#!/usr/bin/python3
import os
import argparse
import socket
import time
import io
import zmq
from obci.control.common.message import send_msg, PollingObject
import obci.control.common.net_tools as net
import obci.control.common.obci_control_settings as settings
from obci.control.peer.peer_config_serializer import PeerConfigSerializerJSON
from obci.control.launcher.obci_control_peer import OBCIControlPeer, basic_arg_parser
from .subprocess_monitor import SubprocessMonitor, TimeoutDescription, NO_STDIO
from obci.control.launcher.obci_control_peer import RegistrationDescription
from . import subprocess_monitor
from . import launch_file_parser
from .launch_file_serializer import serialize_scenario_json
import obci.control.launcher.launcher_tools as launcher_tools
from . import system_config
import obci.control.peer.peer_cmd as peer_cmd
from obci.utils.openbci_logging import log_crash
from . import twisted_tcp_handling
REGISTER_TIMEOUT = 25
[docs]class OBCIExperiment(OBCIControlPeer):
msg_handlers = OBCIControlPeer.msg_handlers.copy()
@log_crash
def __init__(self, sandbox_dir, launch_file=None,
source_addresses=None,
source_pub_addresses=None,
rep_addresses=None,
pub_addresses=None,
name='obci_experiment',
current_ip=None,
launch=False,
overwrites=None):
# TODO TODO TODO !!!!
# cleaner subclassing of obci_control_peer!!!
self.source_pub_addresses = source_pub_addresses
self.launch_file = launcher_tools.obci_root_relative(launch_file)
self.origin_machine = socket.gethostname()
self.poller = PollingObject()
self.current_ip = current_ip
self._nearby_machines = net.DNS()
super(OBCIExperiment, self).__init__(
source_addresses,
rep_addresses,
pub_addresses,
name)
self.name = name + ' on ' + socket.gethostname()
self.sandbox_dir = sandbox_dir if sandbox_dir else settings.DEFAULT_SANDBOX_DIR
self.supervisors = {} # machine -> supervisor contact/other info
self._wait_register = 0
self._ready_register = 0
self._kill_and_launch = None
self.__cfg_morph = False
self._exp_extension = {}
self.sv_processes = {} # machine -> Process objects)
self.unsupervised_peers = {}
self.subprocess_mgr = SubprocessMonitor(self.ctx, self.uuid, logger=self.logger)
if launch_file in ['None', '']: # command line arg
self._initialize_experiment_without_config()
else:
self.exp_config, self.status = self._initialize_experiment_config(self.launch_file, overwrites)
self.logger.info("initialised config")
self.status_changed(self.status.status_name, self.status.details)
self.logger.info("status changed!!!" + self.status.status_name)
self.mx_addr = None
self.mx_pass = None
[docs] def net_init(self):
self.source_sub_socket = self.ctx.socket(zmq.SUB)
self.source_sub_socket.setsockopt_string(zmq.SUBSCRIBE, "")
if self.source_pub_addresses:
for addr in self.source_pub_addresses:
self.source_sub_socket.connect(addr)
self._all_sockets.append(self.source_sub_socket)
(self.supervisors_rep, self.supervisors_rep_addrs) = self._init_socket(
[],
zmq.REP)
(self.supervisors_sub, self.supervisors_sub_addrs) = (self.ctx.socket(zmq.SUB), [])
self._all_sockets.append(self.supervisors_sub)
self._all_sockets.append(self.supervisors_rep)
super(OBCIExperiment, self).net_init()
self.rep_addresses.append(self._ip_based_addr(net.choose_addr(self.rep_addresses)))
self.pub_addresses.append(self._ip_based_addr(net.choose_addr(self.pub_addresses)))
tcp_port = 0
self._tcp_proxy_thr, tcp_port = twisted_tcp_handling.run_twisted_server(
('0.0.0.0', tcp_port),
self.ctx,
self.rep_addresses[0])
self.tcp_addresses = [(self.current_ip, tcp_port),
(socket.gethostname(), tcp_port)]
def _ip_based_addr(self, other_addr):
return 'tcp://' + str(self.current_ip) + ':' + str(net.port(other_addr))
[docs] def params_for_registration(self):
return dict(pid=os.getpid(), origin_machine=self.origin_machine,
status_name='', details='', launch_file_path=self.launch_file,
tcp_addrs=[self.tcp_addresses[0]])
def _handle_registration_response(self, response):
self._nearby_machines.mass_update(response.params)
[docs] def custom_sockets(self):
return [self.source_sub_socket, self.supervisors_sub, self.supervisors_rep]
[docs] def args_for_process_sv(self, machine, local=False):
args = ['--sv-addresses']
local = self._nearby_machines.is_this_machine(machine)
a = self._ip_based_addr(net.choose_addr(self.supervisors_rep_addrs))
if local:
port = net.port(a)
a = 'tcp://' + socket.gethostname() + ':' + str(port)
# sv_rep_ = net.choose_not_local(self.supervisors_rep_addrs)
# if not sv_rep_ or local:
# sv_rep_ = net.choose_local(self.supervisors_rep_addrs)
args.append(a) # addr_to_pass) # += sv_rep_[:1]
args.append('--sv-pub-addresses')
a = self._ip_based_addr(net.choose_addr(self.pub_addresses))
if local:
port = net.port(a)
a = 'tcp://' + socket.gethostname() + ':' + str(port)
# pub_addrs = net.choose_not_local(self.pub_addresses)
# if not pub_addrs or local:
# pub_addrs = net.choose_local(self.pub_addresses, ip=True)
args.append(a) # += pub_addrs[:1] #self.pub_addresses
name = self.name if self.name and self.name != 'obci_experiment' else\
os.path.basename(self.launch_file)
args += [
'--sandbox-dir', str(self.sandbox_dir),
'--name', name +
'-' + self.uuid.split('-', 1)[0] +
'-' + machine,
'--experiment-uuid', self.uuid
]
return args
def _start_obci_process_supervisor(self, machine_addr):
args = self.args_for_process_sv(machine_addr)
proc_type = 'obci_process_supervisor'
if machine_addr == self.origin_machine:
path = 'obci_process_supervisor'
sv_obj, details = self.subprocess_mgr.new_local_process(path, args,
proc_type=proc_type,
capture_io=NO_STDIO)
else:
try:
srv_ip = self._nearby_machines.ip(hostname=machine_addr)
except Exception as e:
det = "Machine " + machine_addr + " not found, cannot launch remote process!" +\
"Is obci_server running there? " +\
"If yes, maybe you should wait for a few seconds and retry."
self.logger.critical(det)
return False, det
conn_addr = 'tcp://' + srv_ip + ':' + net.server_rep_port()
sv_obj, details = self.subprocess_mgr.new_remote_process(path=None,
args=args, proc_type=proc_type,
name=self.uuid, machine_ip=machine_addr,
conn_addr=conn_addr, capture_io=NO_STDIO
)
if sv_obj is None:
return False, details
timeout_handler = TimeoutDescription(timeout=REGISTER_TIMEOUT,
timeout_method=self._handle_register_sv_timeout,
timeout_args=[sv_obj])
sv_obj.set_registration_timeout_handler(timeout_handler)
self.sv_processes[machine_addr] = sv_obj
return sv_obj, None
def _start_obci_process_supervisors(self, peer_machines):
self._wait_register = len(peer_machines)
details = None
for machine in peer_machines:
result, details = self._start_obci_process_supervisor(machine)
if not result:
self.status.set_status(launcher_tools.FAILED_LAUNCH, details)
details = "FAILED to start supervisor: {0}".format(details)
self.logger.error(details)
self.status_changed(self.status.status_name, self.status.details)
return False, details
k = result.machine_ip
self.sv_processes[k] = result
return True, details
def _send_launch_data(self):
pass
def _start_experiment(self):
"""
START EXPERIMENT!!!!
##################################################################
"""
result, details = self._start_obci_process_supervisors(self.exp_config.peer_machines())
if not result:
send_msg(self._publish_socket, self.mtool.fill_msg("experiment_launch_error",
sender=self.uuid, details=details,
err_code='supervisor_launch_error'))
return result, details
def _initialize_experiment_config(self, launch_file, overwrites=None):
status = launcher_tools.ExperimentStatus()
exp_config = system_config.OBCIExperimentConfig(uuid=self.uuid)
exp_config.origin_machine = self.origin_machine
exp_config.launch_file_path = launch_file
result, details = self.make_experiment_config(exp_config, launch_file, status)
if overwrites:
try:
for [ovr, other] in overwrites:
exp_config.update_peer_config(other['peer_id'], ovr)
if other['config_file']:
for f in other['config_file']:
exp_config.file_update_peer_config(other['peer_id'], f)
except Exception as e:
details = str(e)
exp_config.status(status)
status.details = details
# status_changed(status.status_name, status.details)
if not launch_file:
self.logger.error("No launch file")
elif not result:
self.logger.error("- - - - - - - NEW LAUNCH FILE INVALID!!! - - - - - - - "
"status: " + str(status.as_dict()) + str(details))
return exp_config, status
def _initialize_experiment_without_config(self):
self.status = launcher_tools.ExperimentStatus()
self.status.set_status(launcher_tools.NOT_READY, details="No launch_file")
self.exp_config = system_config.OBCIExperimentConfig()
self.exp_config.origin_machine = self.origin_machine
[docs] def make_experiment_config(self, exp_config, launch_file, status):
launch_parser = launch_file_parser.LaunchFileParser(
launcher_tools.obci_root(), settings.DEFAULT_SCENARIO_DIR)
if not launch_file:
return False, "Empty scenario."
try:
with open(launcher_tools.expand_path(launch_file)) as f:
self.logger.info("launch file opened " + launch_file)
launch_parser.parse(f, exp_config, apply_globals=True)
except Exception as e:
self.logger.critical("Launch file invalid....." + launch_file)
status.set_status(launcher_tools.NOT_READY, details=str(e))
return False, str(e)
# print self.exp_config
rd, details = exp_config.config_ready()
if rd:
status.set_status(launcher_tools.READY_TO_LAUNCH)
else:
status.set_status(launcher_tools.NOT_READY, details=details)
return True, None
[docs] def status_changed(self, status_name, details, peers=None):
# TODO use PUB/SUB pattern
send_msg(
self.source_req_socket,
self.mtool.fill_msg(
'experiment_status_change',
status_name=status_name,
details=details,
uuid=self.uuid,
peers=peers))
self.poller.poll_recv(self.source_req_socket, timeout=8000)
[docs] def peer_type(self):
return 'obci_experiment'
@msg_handlers.handler('register_peer')
[docs] def handle_register_peer(self, message, sock):
"""Experiment"""
if message.peer_type == "obci_process_supervisor":
machine, pid = message.other_params['machine'], message.other_params['pid']
if message.other_params['mx_data'] is not None and not self.mx_addr:
# right now we support only one mx per obci instance
ip = self._nearby_machines.ip(machine) if self._nearby_machines.dict_snapshot() else\
machine
self.mx_addr = ip + ':' + message.other_params['mx_data'][0].split(':')[1]
self.mx_pass = message.other_params['mx_data'][1]
proc = self.subprocess_mgr.process(machine, pid)
if proc is None:
send_msg(sock, self.mtool.fill_msg("rq_error",
err_code='process_not_found', request=message.dict()))
return
status, details = proc.status()
if status != subprocess_monitor.UNKNOWN:
send_msg(sock, self.mtool.fill_msg("rq_error",
err_code='process_status_invalid', request=message.dict(),
details=(status, details)))
send_msg(self._publish_socket, self.mtool.fill_msg("experiment_launch_error",
sender=self.uuid, details=(status, details),
err_code='registration_error'))
return
self.logger.info("exp registration message " + str(vars(message)))
adr_list = [message.rep_addrs, message.pub_addrs]
if machine != socket.gethostname():
ip = self._nearby_machines.ip(machine)
for i, addrs in enumerate([message.rep_addrs, message.pub_addrs]):
first = addrs[0]
port = net.port(first)
adr_list[i] = ['tcp://' + ip + ':' + str(port)]
self.logger.info("addresses after filtering: %s", str(adr_list))
desc = self.supervisors[machine] = \
RegistrationDescription(
message.uuid,
message.name,
adr_list[0],
adr_list[1],
message.other_params['machine'],
message.other_params['pid'])
proc.registered(desc)
a = self._choose_process_address(proc, desc.pub_addrs)
if a is not None:
self.supervisors_sub_addrs.append(a)
self.supervisors_sub.setsockopt_string(zmq.SUBSCRIBE, "")
self.supervisors_sub.connect(a)
self.logger.info("Connecting to supervisor pub address {0} ({1})".format(a, machine))
else:
self.logger.error("Could not find suitable PUB address to connect. (supervisor on " + machine + ")")
launch_data = self.exp_config.launch_data(machine)
order = self.exp_config.peer_order()
send_msg(sock, self.mtool.fill_msg("rq_ok", params=dict(launch_data=launch_data,
peer_order=order)))
# inform observers
send_msg(self._publish_socket, self.mtool.fill_msg("process_supervisor_registered",
sender=self.uuid,
machine_ip=desc.machine_ip))
self._wait_register -= 1
if self._wait_register == 0:
if self._kill_and_launch:
kill, launch, new_supervisors, keep_configs = self._kill_and_launch
to_launch = launch[machine]
to_kill = kill.get(machine, [])
send_msg(self._publish_socket, self.mtool.fill_msg("manage_peers",
kill_peers=to_kill, start_peers_data=to_launch,
receiver=desc.uuid))
elif self._exp_extension:
ldata = {}
peer_id = self._exp_extension[machine][0]
ldata[peer_id] = self.exp_config.launch_data(machine)[peer_id]
send_msg(self._publish_socket, self.mtool.fill_msg("start_peers", mx_data=self.mx_args(),
add_launch_data={machine: ldata}))
else:
send_msg(self._publish_socket, self.mtool.fill_msg("start_broker",
args=self.mx_args()))
[docs] def mx_args(self):
return ["run_multiplexer", self.mx_addr]
# '--multiplexer-password', self.mx_pass
# '--rules', launcher_tools.mx_rules_path()]
@msg_handlers.handler("launched_process_info")
[docs] def handle_launched_process_info(self, message, sock):
if message.proc_type == 'multiplexer':
self._wait_register = len(self.exp_config.peer_machines())
self.status.peer_status(message.name).set_status(
launcher_tools.RUNNING)
# tmp.synchro workaround: give MX some time to initialize
time.sleep(0.3)
send_msg(self._publish_socket, self.mtool.fill_msg('start_config_server', mx_data=self.mx_args()))
elif message.name == 'config_server':
self.status.peer_status(message.name).set_status(
launcher_tools.RUNNING)
time.sleep(0.3)
if self._kill_and_launch:
kill_data, launch_data, new_supervisors, keep_configs = self._kill_and_launch
for machine in kill_data:
to_kill = kill_data[machine]
if machine in launch_data:
to_launch = launch_data[machine]
else:
to_launch = {}
send_msg(self._publish_socket, self.mtool.fill_msg("manage_peers",
kill_peers=to_kill, start_peers_data=to_launch,
receiver=self.supervisors[machine].uuid))
for machine in launch_data:
if machine not in new_supervisors and machine not in kill_data:
to_kill = []
to_launch = launch_data[machine]
send_msg(
self._publish_socket,
self.mtool.fill_msg(
"manage_peers",
kill_peers=to_kill,
start_peers_data=to_launch,
receiver=self.supervisors[machine].uuid))
else:
send_msg(self._publish_socket, self.mtool.fill_msg('start_peers',
mx_data=self.mx_args()))
elif message.proc_type == 'obci_peer':
self.status.peer_status(message.name).set_status(
launcher_tools.LAUNCHING)
self.status_changed(self.status.status_name, self.status.details,
peers={message.name: self.status.peer_status(message.name).status_name})
@msg_handlers.handler("all_peers_launched")
[docs] def handle_all_peers_launched(self, message, sock):
if self._exp_extension:
self._exp_extension = {}
self.logger.info("all additional peers launched.")
return
self._wait_register -= 1
self.logger.info(str(message) + str(self._wait_register))
if self._wait_register == 0:
self.status.set_status(launcher_tools.LAUNCHING)
self.status_changed(self.status.status_name, self.status.details)
if not self._kill_and_launch: # if kill/launch, this variable was set in _kill_and_launch_peers()
# without mx and config_server, for now default is 1 mx
self._ready_register = len(self.exp_config.peers) - 2
def _choose_process_address(self, proc, addresses):
self.logger.info("(exp) choosing sv address:" + str(addresses))
addrs = []
chosen = None
if proc.is_local():
addrs = net.choose_local(addresses)
if not addrs:
addrs = net.choose_not_local(addresses)
if addrs:
chosen = addrs.pop()
return chosen
@msg_handlers.handler('get_experiment_info')
[docs] def handle_get_experiment_info(self, message, sock):
send_msg(self._publish_socket, self.mtool.fill_msg('rq_ok'))
send_msg(sock, self.mtool.fill_msg('experiment_info',
experiment_status=self.status.as_dict(),
unsupervised_peers=self.unsupervised_peers,
origin_machine=self.exp_config.origin_machine,
uuid=self.exp_config.uuid,
scenario_dir=self.exp_config.scenario_dir,
peers=self.exp_config.peers_info(),
launch_file_path=self.exp_config.launch_file_path,
name=self.name))
@msg_handlers.handler('get_peer_info')
[docs] def handle_get_peer_info(self, message, sock):
if message.peer_id not in self.exp_config.peers:
send_msg(sock, self.mtool.fill_msg('rq_error', request=message.dict(),
err_code='peer_id_not_found'))
else:
peer_info = self.exp_config.peers[message.peer_id].info(detailed=True)
send_msg(sock, self.mtool.fill_msg('peer_info', **peer_info))
@msg_handlers.handler('get_peer_param_values')
[docs] def handle_get_peer_param_values(self, message, sock):
if message.peer_id not in self.exp_config.peers:
send_msg(sock, self.mtool.fill_msg('rq_error', request=message.dict(),
err_code='peer_id_not_found'))
else:
peer_id = message.peer_id
vals = self.exp_config.all_param_values(peer_id)
send_msg(sock, self.mtool.fill_msg('peer_param_values',
peer_id=peer_id, param_values=vals,
sender=self.uuid))
@msg_handlers.handler('get_peer_config')
[docs] def handle_get_peer_config(self, message, sock):
send_msg(sock, self.mtool.fill_msg('ping', sender=self.uuid))
@msg_handlers.handler('get_experiment_scenario')
[docs] def handle_get_experiment_scenario(self, message, sock):
jsoned = serialize_scenario_json(self.exp_config)
send_msg(sock, self.mtool.fill_msg('experiment_scenario', scenario=jsoned))
@msg_handlers.handler('set_experiment_scenario')
[docs] def handle_set_experiment_scenario(self, message, sock):
if self.exp_config.peers:
send_msg(sock, self.mtool.fill_msg('rq_error', err_code='scenario_already_set'))
else:
jsonpar = launch_file_parser.LaunchJSONParser(
launcher_tools.obci_root(), settings.DEFAULT_SCENARIO_DIR)
self.exp_config.launch_file_path = None
inbuf = io.StringIO(message.scenario)
jsonpar.parse(inbuf, self.exp_config)
self.logger.info("set experiment scenario............... %s" % message.scenario)
rd, details = self.exp_config.config_ready()
if rd:
self.status.set_status(launcher_tools.READY_TO_LAUNCH)
else:
self.status.set_status(launcher_tools.NOT_READY, details=details)
self.logger.warning("scenario not ready %s %s", str(rd), str(details))
self.exp_config.status(self.status)
self.launch_file = self.exp_config.launch_file_path = message.launch_file_path
send_msg(sock, self.mtool.fill_msg('rq_ok'))
send_msg(self._publish_socket, self.mtool.fill_msg('experiment_scenario',
scenario=message.scenario,
launch_file_path=message.launch_file_path,
uuid=self.uuid))
send_msg(
self.source_req_socket,
self.mtool.fill_msg(
'experiment_info_change',
uuid=self.uuid,
name=self.name,
launch_file_path=message.launch_file_path))
self.poller.poll_recv(self.source_req_socket, timeout=8000)
@msg_handlers.handler('start_experiment')
[docs] def handle_start_experiment(self, message, sock):
if not self.status.status_name == launcher_tools.READY_TO_LAUNCH:
send_msg(sock, self.mtool.fill_msg('rq_error', request=message.dict(),
err_code='exp_status_' + self.status.status_name,
details=self.status.details))
else:
self.status.set_status(launcher_tools.LAUNCHING)
send_msg(sock, self.mtool.fill_msg('starting_experiment',
sender=self.uuid))
self.status_changed(self.status.status_name, self.status.details)
result, details = self._start_experiment()
if not result:
send_msg(self._publish_socket, self.mtool.fill_msg("experiment_launch_error",
sender=self.uuid, err_code='', details=details))
self.logger.error("EXPERIMENT LAUNCH ERROR!!!")
self.status.set_status(launcher_tools.FAILED_LAUNCH, details)
self.status_changed(self.status.status_name, self.status.details)
@msg_handlers.handler('join_experiment')
[docs] def handle_join_experiment(self, message, sock):
if message.peer_id in self.exp_config.peers or \
message.peer_id in self.unsupervised_peers:
send_msg(sock, self.mtool.fill_msg('rq_error', request=message.dict(),
err_code='peer_id_in_use'))
elif self.mx_addr is None and 'mx' in self.exp_config.peers:
send_msg(sock, self.mtool.fill_msg('rq_error', request=message.dict(),
err_code='mx_not_running'))
elif self.status.status_name != launcher_tools.RUNNING and \
self.status.status_name != launcher_tools.LAUNCHING:
# temporary status bug workaround.
send_msg(sock, self.mtool.fill_msg('rq_error', request=message.dict(),
err_code='exp_status_' + self.status.status_name,
details=""))
else:
self.unsupervised_peers[message.peer_id] = dict(peer_type=message.peer_type,
path=message.path)
send_msg(sock, self.mtool.fill_msg('rq_ok', params=dict(mx_addr=self.mx_addr)))
@msg_handlers.handler('leave_experiment')
[docs] def handle_leave_experiment(self, message, sock):
if message.peer_id not in self.unsupervised_peers:
send_msg(sock, self.mtool.fill_msg('rq_error', request=message.dict(),
err_code='peer_id_not_found'))
else:
del self.unsupervised_peers[message.peer_id]
send_msg(sock, self.mtool.fill_msg('rq_ok'))
@msg_handlers.handler('add_peer')
[docs] def handle_add_peer(self, message, sock):
"""Add new peer to existing scenario. It may run on a different machine than
already running peers. add_peer works at runtime and before runtime.
"""
self.logger.info("Handle add peer: " + str(message))
machine = message.machine or self.origin_machine
peer_id = message.peer_id
_launching = None # state of the (maybe ongoing) launching process
if (self.status.status_name in launcher_tools.POST_RUN_STATUSES) or\
self.status.status_name == launcher_tools.READY_TO_LAUNCH and\
self.status.peer_status_exists(launcher_tools.LAUNCHING):
self.logger.warning("add peer - experiment status incompatible!")
send_msg(sock, self.mtool.fill_msg('rq_error', request=message.dict(),
err_code='experiment_status_incompatible'))
return
if peer_id in self.exp_config.peers:
self.logger.warning("add peer - peer id in use!")
send_msg(sock, self.mtool.fill_msg('rq_error', request=message.dict(),
err_code='peer_id_in_use'))
try:
self.logger.info("add peer - try extending config...")
_launching = len(self.supervisors) == len(self.exp_config.peer_machines())
launch_file_parser.extend_experiment_config(
self.exp_config,
peer_id,
message.peer_path,
config_sources=message.config_sources,
launch_deps=message.launch_dependencies,
custom_config_path=message.custom_config_path,
param_overwrites=message.param_overwrites,
machine=machine,
apply_globals=message.apply_globals)
except Exception as e:
self.logger.warning("add peer - problem with extending config!")
send_msg(sock, self.mtool.fill_msg('rq_error', details=str(e), request=message.dict(),
err_code='problem_with_extending_config'))
return
else:
self.logger.info("add peer - check config valid ...")
rd, details = self.exp_config.config_ready()
if rd:
self.logger.info("add peer - config is valid!")
# config is valid, we can proceed
send_msg(sock, self.mtool.fill_msg('rq_ok'))
self.status.peers_status[peer_id] = launcher_tools.PeerStatus(
peer_id, status_name=launcher_tools.READY_TO_LAUNCH)
ser = PeerConfigSerializerJSON()
bt = io.StringIO()
ser.serialize(self.exp_config.peers[peer_id].config, bt)
peer_conf = bt.getvalue()
# broadcast message about scenario modification
self.logger.info("add peer - bradcast scenario mods...")
send_msg(
self._publish_socket,
self.mtool.fill_msg(
"new_peer_added",
peer_id=peer_id,
machine=machine,
uuid=self.uuid,
status_name=launcher_tools.READY_TO_LAUNCH,
config=peer_conf,
peer_path=message.peer_path))
else:
self.logger.warning("add peer - config invalid!")
send_msg(
sock,
self.mtool.fill_msg(
'rq_error',
err_code='config_incomplete',
details=details,
request=message.dict()))
return
if self.status.status_name not in launcher_tools.RUN_STATUSES:
self.logger.info("additional peers will launch along with base scenario")
else:
# ...do actual work
if machine not in self.supervisors and _launching:
self.logger.info("add peer - start supervisor on machine: " + str(machine))
self._exp_extension = {machine: [peer_id]}
self._start_obci_process_supervisors([machine])
elif not _launching:
# the peer data will be sent to process supervisor in launch_data, so we do nothing
self.logger.info("add peer - not _lauching, nothing else to do")
pass
elif self.status.peer_status('mx').status_name == launcher_tools.RUNNING:
# 'start_peers' has been already sent so we additionally request for launching the
# new peer
ldata = {}
ldata[peer_id] = self.exp_config.launch_data(machine)[peer_id]
self.logger.info("add peer - send start_peers...")
send_msg(self._publish_socket, self.mtool.fill_msg("start_peers", mx_data=self.mx_args(),
add_launch_data={machine: ldata}))
else:
self.logger.info("add peer - last else, nothing else to do")
self.logger.info("add peer - ALL DONE!")
@msg_handlers.handler("kill_peer")
[docs] def handle_kill_peer(self, message, sock):
peer_id = message.peer_id
peer = self.exp_config.peers.get(message.peer_id, None)
if not peer:
send_msg(sock, self.mtool.fill_msg('rq_error', request=message.dict(),
err_code="peer_id_not_found"))
return
del self.exp_config.peers[peer_id]
rd, details = self.exp_config.config_ready()
self.exp_config.peers[peer_id] = peer
if not rd:
send_msg(sock, self.mtool.fill_msg('rq_error', request=message.dict(),
err_code="config_dependencies_error", details=details))
return
if message.remove_config:
peer.del_after_stop = True
send_msg(sock, self.mtool.fill_msg("rq_ok"))
send_msg(self._publish_socket, self.mtool.fill_msg("_kill_peer", peer_id=peer_id,
machine=(peer.machine or self.origin_machine), morph=False))
@msg_handlers.handler("obci_peer_registered")
[docs] def handle_obci_peer_registered(self, message, sock):
peer_id = message.peer_id
if peer_id not in self.exp_config.peers:
self.logger.error("Unknown Peer registered!!! {0}".format(peer_id))
else:
self.logger.info("Peer registered!!! {0}".format(peer_id))
for par, val in message.params.items():
self.exp_config.update_local_param(peer_id, par, val)
send_msg(
self._publish_socket,
self.mtool.fill_msg(
'obci_control_message',
severity='info',
msg_code='obci_peer_registered',
launcher_message=message.dict(),
sender=self.uuid,
peer_name=self.name,
peer_type=self.peer_type(),
sender_ip=self.origin_machine))
@msg_handlers.handler("obci_peer_params_changed")
[docs] def handle_obci_peer_params_changed(self, message, sock):
peer_id = message.peer_id
if peer_id not in self.exp_config.peers:
self.logger.error("Unknown Peer update!!! {0}".format(
self.name, self.peer_type(), peer_id))
else:
self.logger.info("Params changed!!! {0} {1}".format(
peer_id, message.params))
for par, val in message.params.items():
try:
self.exp_config.update_local_param(peer_id, par, val)
except Exception as e:
self.logger.error("Invalid params!!! {0} {1} {2}".format(peer_id,
message.params, str(e)))
send_msg(
self._publish_socket,
self.mtool.fill_msg(
'obci_control_message',
severity='info',
msg_code='obci_peer_params_changed',
launcher_message=message.dict(),
sender=self.uuid,
peer_name=self.name,
peer_type=self.peer_type(),
sender_ip=self.origin_machine))
@msg_handlers.handler("obci_peer_ready")
[docs] def handle_obci_peer_ready(self, message, sock):
peer_id = message.peer_id
if peer_id not in self.exp_config.peers:
self.logger.error("Unknown Peer update!!! {0}".format(peer_id))
return
self.status.peer_status(peer_id).set_status(
launcher_tools.RUNNING)
self._ready_register -= 1
self.logger.info("{0} peer ready! {1} to go".format(peer_id,
self._ready_register))
if self._ready_register == 0:
self.status.set_status(launcher_tools.RUNNING)
self.status_changed(self.status.status_name, self.status.details,
peers={peer_id: self.status.peer_status(peer_id).status_name})
# {"status": ["failed", null],
# "sender": "fb32da42-c8b6-47db-a958-249cd5e1f366",
# "receiver": "", "sender_ip": "127.0.0.1",
# "path": "/home/administrator/dev/openbci/acquisition/info_saver_peer.py",
# "peer_id": "info_saver", "type": "obci_peer_dead"}
@msg_handlers.handler('obci_peer_dead')
[docs] def handle_obci_peer_dead(self, message, sock):
if message.peer_id not in self.exp_config.peers:
# during experiment transformation, 'obci_peer_dead' messages may come
# after explicitly terminating peers and removing them from experiment peers
# configuration
self.logger.warning("peer_id" + str(message.peer_id) + "not found!")
return
status = message.status[0]
details = message.status[1]
self.status.peer_status(message.peer_id).set_status(status, details=details)
if status == launcher_tools.FAILED:
send_msg(self._publish_socket,
self.mtool.fill_msg("stop_all", receiver=""))
self.status.set_status(launcher_tools.FAILED,
details='Failed process ' + message.peer_id)
self.logger.error("Experiment failed: (process: %s)" % message.peer_id)
elif not self.status.peer_status_exists(launcher_tools.RUNNING) and\
(self.status.status_name not in [launcher_tools.FAILED, launcher_tools.FAILED_LAUNCH]):
self.status.set_status(status)
if self.exp_config.peers[message.peer_id].del_after_stop:
del self.exp_config.peers[message.peer_id]
self.status.del_peer_status(message.peer_id)
if self.__cfg_morph and message.peer_id == 'config_server' and status == launcher_tools.TERMINATED:
self.__cfg_morph = False
configs_to_restore = self._kill_and_launch[3]
send_msg(
self._publish_socket,
self.mtool.fill_msg(
"start_config_server",
mx_data=self.mx_args(),
restore_config=configs_to_restore))
message.experiment_id = self.uuid
send_msg(self._publish_socket, message.SerializeToString())
self.status_changed(self.status.status_name, self.status.details)
@msg_handlers.handler('obci_launch_failed')
[docs] def handle_obci_launch_failed(self, message, sock):
if self._exp_extension:
self.logger.error("launch of additional peers failed")
self._exp_extension = {}
pass
@msg_handlers.handler('launch_error')
[docs] def handle_launch_error(self, message, sock):
peer_id = message.details["peer_id"]
if peer_id not in self.exp_config.peers:
self.logger.error("peer_id" + str(message.peer_id) + "not found!")
return
self.status.peer_status(peer_id).set_status(launcher_tools.FAILED_LAUNCH)
self.status.set_status(launcher_tools.FAILED_LAUNCH,
details='Failed to launch process ' + peer_id)
message.sender = self.uuid
send_msg(self._publish_socket, message.SerializeToString())
self.status_changed(self.status.status_name, self.status.details)
@msg_handlers.handler('update_peer_config')
[docs] def handle_update_peer_config(self, message, sock):
if self.status.status_name not in [launcher_tools.NOT_READY,
launcher_tools.READY_TO_LAUNCH]:
send_msg(sock, self.mtool.fill_msg('rq_error', err_code='update_not_possible',
details='Experiment status: ' + self.status.status_name))
else:
conf = dict(local_params=message.local_params,
external_params=message.external_params,
launch_dependencies=message.launch_dependencies,
config_sources=message.config_sources)
peer_id = message.peer_id
try:
self.exp_config.update_peer_config(peer_id, conf)
except Exception as e:
send_msg(sock, self.mtool.fill_msg('rq_error', err_code='update_failed',
details=str(e)))
else:
send_msg(sock, self.mtool.fill_msg('rq_ok'))
@msg_handlers.handler("dead_process")
[docs] def handle_dead_process(self, message, sock):
proc = self.subprocess_mgr.process(message.machine, message.pid)
if proc is not None:
proc.mark_delete()
status, details = proc.status()
self.logger.warning("Process " + proc.proc_type + "dead:" +
status + str(details) + proc.name + str(proc.pid))
if proc.proc_type == 'obci_process_supervisor':
send_msg(self._publish_socket,
self.mtool.fill_msg("stop_all", receiver=""))
self.status.set_status(launcher_tools.FAILED,
details='Failed LAUNCHER component obci_process_supervisor')
elif proc.proc_type == 'obci_experiment':
pass
if status == subprocess_monitor.FAILED:
pass
self.status_changed(self.status.status_name, self.status.details)
@msg_handlers.handler('save_scenario')
[docs] def handle_save_scenario(self, message, sock):
send_msg(sock, self.mtool.fill_msg('rq_error', err_code='action_not_supported'))
@msg_handlers.handler('nearby_machines')
[docs] def handle_nearby_machines(self, message, sock):
self._nearby_machines.mass_update(message.nearby_machines)
self.current_ip = self._nearby_machines.this_addr_network()
send_msg(self._publish_socket, message.SerializeToString())
@msg_handlers.handler("experiment_finished")
[docs] def handle_experiment_finished(self, message, sock):
# [make mx_messsage]
# [handler in config_server]
# stop_all
# status - finished
pass
@msg_handlers.handler("morph_to_new_scenario")
[docs] def handle_morph(self, message, sock):
# FIXME "LAUNCHING" -- msg bug workaround
if self.status.status_name not in [launcher_tools.RUNNING, launcher_tools.LAUNCHING]:
self.logger.error("EXPERIMENT STATUS NOT RUNNING, MORPH NOT ALLOWED")
if sock.getsockopt(zmq.TYPE) in [zmq.REQ, zmq.ROUTER]:
send_msg(sock, self.mtool.fill_msg('rq_error',
err_code='experiment_not_running',
details=self.status.details))
return
new_launch_file = launcher_tools.obci_root_relative(message.launch_file)
exp_config, status = self._initialize_experiment_config(new_launch_file,
message.overwrites)
self.logger.info("new launch status %s %s", str(exp_config), str(status.status_name))
if status.status_name != launcher_tools.READY_TO_LAUNCH:
self.logger.error("NEW SCENARIO NOT READY TO LAUNCH, MOPRH NOT ALLOWED")
if sock.getsockopt(zmq.TYPE) in [zmq.REQ, zmq.ROUTER]:
send_msg(sock, self.mtool.fill_msg('rq_error',
err_code='launch_file_invalid',
details=dict(status_name=status.status_name,
details=status.details)))
return
valid, details = self._validate_morph_leave_on(self.exp_config, exp_config, message.leave_on)
self.logger.info("morph valid: %s %s", str(valid), str(details))
if not valid:
if sock.getsockopt(zmq.TYPE) in [zmq.REQ, zmq.ROUTER]:
send_msg(sock, self.mtool.fill_msg('rq_error',
err_code='leave_on_peers_invalid',
details=details))
return
kill_list, launch_list = self._diff_scenarios(self.exp_config,
exp_config, message.leave_on)
self.logger.info("KILL_LIST " + str(kill_list))
self.logger.info("LAUNCH_LIST" + str(launch_list))
old_name = self.name
old_status = self.status
self.name = message.name if message.name is not None else new_launch_file
self.launch_file = new_launch_file
self.status = status
old_config = self.exp_config
self.exp_config = exp_config
for p in message.leave_on:
self.exp_config.peers[p] = old_config.peers[p]
if sock.getsockopt(zmq.TYPE) in [zmq.REP, zmq.ROUTER]:
send_msg(sock, self.mtool.fill_msg('starting_experiment'))
self.status.set_status(launcher_tools.LAUNCHING)
send_msg(
self.source_req_socket,
self.mtool.fill_msg(
'experiment_transformation',
status_name=self.status.status_name,
details=self.status.details,
uuid=self.uuid,
name=self.name,
launch_file=new_launch_file,
old_name=old_name,
old_launch_file=old_config.launch_file_path))
self.poller.poll_recv(self.source_req_socket, timeout=8000)
# TODO -- notice obci_server of name/config change
self._kill_and_launch_peers(kill_list, launch_list, self.exp_config, old_config)
self._kill_unused_supervisors()
pst = {}
for peer in self.status.peers_status:
if peer not in launch_list and peer not in kill_list:
self.status.peer_status(peer).set_status(old_status.peer_status(peer).status_name,
old_status.peer_status(peer).details)
pst[peer] = self.status.peer_status(peer).status_name
self.status_changed(self.status.status_name, self.status.details,
peers=pst)
# list: to kill, to restart (unless in leave-on)
# start supervisors if new machnes specified
# send launch_data to all
# start
# deregister / register in obci_server
def _kill_and_launch_peers(self, kill_list, launch_list, new_config, old_config):
kill_data = {}
for peer in kill_list:
machine = old_config.peers[peer].machine
if not machine:
machine = self.origin_machine
if machine not in kill_data:
kill_data[machine] = []
kill_data[machine].append(peer)
launch_data = {}
self._ready_register = 0
for machine in new_config.peer_machines():
ldata = new_config.launch_data(machine)
peers = list(ldata.keys())
for peer in peers:
if peer in launch_list:
if machine not in launch_data:
launch_data[machine] = {}
launch_data[machine][peer] = ldata[peer]
self._ready_register += 1
new_supervisors = []
for machine in launch_data:
if machine not in old_config.peer_machines():
new_supervisors.append(machine)
keep_configs = [peer for peer in old_config.peers if peer not in kill_list]
self._kill_and_launch = (kill_data, launch_data, new_supervisors, keep_configs)
if new_supervisors:
self._wait_register = len(new_supervisors)
self._start_obci_process_supervisors(new_supervisors)
# --------------------------------------------------------------------------------------
self.__cfg_morph = True
send_msg(self._publish_socket, self.mtool.fill_msg("_kill_peer", peer_id="config_server", morph=True))
def _kill_unused_supervisors(self):
pass
def _diff_scenarios(self, old_config, new_config, leave_on):
kill_list = []
for peer in old_config.peers:
if peer not in new_config.peers:
kill_list.append(peer)
launch_list = []
for peer in new_config.peers:
if peer not in old_config.peers:
launch_list.append(peer)
for peer in new_config.peers:
if peer in old_config.peers and peer not in leave_on and \
peer not in ['mx', 'config_server']:
kill_list.append(peer)
launch_list.append(peer)
return kill_list, launch_list
def _validate_morph_leave_on(self, old_config, new_config, leave_on):
for peer_id in leave_on:
old_p = old_config.peers.get(peer_id, None)
new_p = new_config.peers.get(peer_id, None)
if old_p is None or new_p is None:
return False, "Peer id " + peer_id + 'present old config: ' \
+ str(old_p is not None) + ', present in new config: ' +\
+ str(new_p is not None)
if old_p.path != new_p.path:
return False, "Peer ids [" + peer_id + "] point to different programs: " +\
"old: " + old_p.path + ', new: ' + new_p.path
old_machine = old_p.machine if old_p.machine else socket.gethostname()
new_machine = new_p.machine if new_p.machine else socket.gethostname()
if old_machine != new_machine:
return False, "Peer id " + peer_id + 'is to be launched on a different machine: ' +\
"old: " + old_machine + ', new:' + new_machine
else:
return True, ""
[docs] def cleanup_before_net_shutdown(self, kill_message, sock=None):
send_msg(self._publish_socket,
self.mtool.fill_msg("kill", receiver="", sender=self.uuid))
self.logger.info('sent KILL to supervisors')
super(OBCIExperiment, self).cleanup_before_net_shutdown(kill_message, sock)
[docs] def clean_up(self):
self.logger.info("exp cleaning up")
self.subprocess_mgr.stop_monitoring()
# self._tcp_srv.shutdown()
def _handle_register_sv_timeout(self, sv_process):
txt = "Supervisor for machine {0} FAILED TO REGISTER before timeout".format(
sv_process.machine_ip)
self.logger.error(txt)
sock = self._push_sock(self.ctx, self._push_addr)
# inform observers about failure
send_msg(sock, self.mtool.fill_msg("experiment_launch_error",
sender=self.uuid,
err_code="create_supervisor_error",
details=dict(machine=sv_process.machine_ip,
error=txt)))
sock.close()
@msg_handlers.handler("get_tail")
[docs] def handle_get_tail(self, message, sock):
if self.status.status_name == launcher_tools.RUNNING:
if message.peer_id not in self.exp_config.peers:
send_msg(sock, self.mtool.fill_msg("rq_error",
err_code="peer_not_found",
details="No such peer: " + message.peer_id))
return
machine = self.exp_config.peer_machine(message.peer_id)
self.logger.info("getting tail for %s %s", message.peer_id, machine)
send_msg(self._publish_socket, message.SerializeToString())
self.client_rq = (message, sock)
@msg_handlers.handler("tail")
[docs] def handle_tail(self, message, sock):
if self.client_rq:
if message.peer_id == self.client_rq[0].peer_id:
send_msg(self.client_rq[1], message.SerializeToString())
def _crash_extra_data(self, exception=None):
import json
data = super(OBCIExperiment, self)._crash_extra_data(exception)
data.update({
'experiment_uuid': self.uuid,
'launch_file_path': self.launch_file,
'name': self.name,
'config': json.loads(serialize_scenario_json(self.exp_config))
})
return data
def _crash_extra_tags(self, exception=None):
tags = super(OBCIExperiment, self)._crash_extra_tags(exception)
tags.update({
'experiment_uuid': self.uuid
})
return tags
[docs]def experiment_arg_parser():
parser = argparse.ArgumentParser(parents=[basic_arg_parser()],
description='Create, launch and manage an OBCI experiment.')
parser.add_argument('--sv-pub-addresses', nargs='+',
help='Addresses of the PUB socket of the supervisor')
parser.add_argument('--sandbox-dir',
help='Directory to store temporary and log files')
parser.add_argument('--launch-file',
help='Experiment launch file')
parser.add_argument('--name', default='obci_experiment',
help='Human readable name of this process')
parser.add_argument('--launch', default=False,
help='Launch the experiment specified in launch file')
parser.add_argument('--current-ip', help='IP addr of host machine')
parser.add_argument('--ovr', nargs=argparse.REMAINDER)
return parser
[docs]def run_obci_experiment():
args = experiment_arg_parser().parse_args()
print(args)
pack = None
if args.ovr is not None:
pack = peer_cmd.peer_overwrites_pack(args.ovr)
exp = OBCIExperiment(args.sandbox_dir,
args.launch_file, args.sv_addresses, args.sv_pub_addresses,
args.rep_addresses, args.pub_addresses, args.name,
args.current_ip, args.launch, overwrites=pack)
exp.run()