#!/usr/bin/python
# -*- coding: utf-8 -*-
import socket
import os
import configparser
import threading
import logging
import time
from obci.control.common.obci_control_settings import INSTALL_DIR, OBCI_HOME_DIR, MAIN_CONFIG_NAME
from obci.control.common.config_helpers import OBCISystemError
[docs]def is_net_addr(addr):
return not addr.startswith('ipc') \
and not addr.startswith('inproc')
[docs]def addr_is_local(addr):
return addr.startswith('tcp://localhost') or\
addr.startswith('tcp://0.0.0.0') or\
addr.startswith('tcp://127.0.0.1')
[docs]def choose_local(addrs, ip=False):
result = []
if not ip:
result = [a for a in addrs if a.startswith('ipc://')]
if not result:
result += [a for a in addrs if addr_is_local(a)]
return result
[docs]def choose_not_local(addrs):
result = [a for a in addrs if a.startswith('tcp://') and not a.startswith('tcp://' + lo_ip()) and not
a.startswith('tcp://localhost')]
# if not result:
# result += [a for a in addrs if a.startswith('tcp://')]
return result
[docs]def choose_addr(addr_list):
nl = choose_not_local(addr_list)
if nl:
return nl[0]
else:
loc = choose_local(addr_list)
return loc[0] if loc else None
[docs]def lo_ip():
return '127.0.0.1'
[docs]def ext_ip(peer_ip=None):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_ip = ''
peer_ip = peer_ip if peer_ip else 'google.com'
try:
s.connect((peer_ip, 9))
client_ip = s.getsockname()[0]
except socket.error as e:
print("ext_ip(peer_ip: {0}): {1}".format(peer_ip, e))
client_ip = lo_ip()
del s
return client_ip
[docs]def server_address(sock_type='rep', local=False, peer_ip=None):
parser = __parser_main_config_file()
if sock_type == 'rep':
port = parser.get('server', 'port')
else:
port = parser.get('server', 'pub_port')
ip = lo_ip() if local else ext_ip(peer_ip=peer_ip)
return 'tcp://' + ip + ':' + port
def __parser_main_config_file():
directory = os.path.abspath(OBCI_HOME_DIR)
filename = MAIN_CONFIG_NAME
fpath = os.path.join(directory, filename)
parser = None
if os.path.exists(fpath):
parser = configparser.RawConfigParser()
with open(fpath) as f:
parser.readfp(f)
else:
print("Main config file not found in {0}".format(directory))
raise OBCISystemError()
return parser
[docs]def port(addr_string):
parts = addr_string.rsplit(':', 1)
if len(parts) < 2:
return None
maybe_port = parts[-1]
try:
port = int(maybe_port)
except ValueError as e:
return None
else:
return port
[docs]def is_ip(addr_string):
parts = addr_string.rsplit(':', 1)
if len(parts) < 2:
return False
nums = parts[0].split('.')
start = nums[0]
ind = nums[0].find('://')
if ind > -1:
start = start[ind + 3:]
nums[0] = start
if len(nums) < 4:
return False
for p in nums:
try:
n = int(p)
except Exception as e:
return False
return True
[docs]def server_pub_port():
parser = __parser_main_config_file()
port = parser.get('server', 'pub_port')
return port
[docs]def server_rep_port():
parser = __parser_main_config_file()
port = parser.get('server', 'port')
return port
[docs]def server_bcast_port():
parser = __parser_main_config_file()
try:
port = parser.get('server', 'bcast_port')
except Exception as e:
print("[ WARNING! WARNING! ] Config file is not up to date. Taking default bcast_port value!")
port = '23123'
return port
[docs]def server_tcp_proxy_port():
parser = __parser_main_config_file()
try:
port = parser.get('server', 'tcp_proxy_port')
except Exception as e:
print("[ WARNING! WARNING! ] Config file is not up to date. Taking default tcp_proxy_port value!")
port = '12012'
return port
[docs]def peer_loglevel():
parser = __parser_main_config_file()
try:
loglevel = parser.get("mx", "peer_loglevel")
except Exception:
print("[ WARNING! WARNING! ] Config file is not up to date. Taking default peer_loglevel value!")
loglevel = "debug"
return loglevel
[docs]class DNS(object):
def __init__(self, allowed_silence_time=45, logger=None):
self.__lock = threading.RLock()
self.__servers = {}
self.logger = logger or logging.getLogger("dns")
self.allowed_silence = allowed_silence_time
[docs] def tcp_rep_addr(self, hostname=None, ip=None, uuid=None):
srv = self._match_srv(hostname, ip, uuid)
return 'tcp://' + srv.ip + ':' + str(srv.rep_port)
def _match_srv(self, hostname=None, ip=None, uuid=None):
matches = []
if hostname is not None:
matches = self.__query('hostname', hostname)
elif ip is not None:
matches = self.__query('ip')
elif uuid is not None:
with self.__lock:
matches = [self.__servers[uuid]]
if matches == []:
raise Exception('Match not found')
if len(matches) > 1:
raise Exception('More than one match for given params:\
hostname: {0}, ip: {1}, uuid: {2} --- {3}'.format(hostname, ip, uuid, matches))
return matches.pop()
def __query(self, attribute, value):
matches = []
with self.__lock:
for srv in self.__servers.values():
if getattr(srv, 'hostname') == value:
matches.append(srv)
return matches
[docs] def http_addr(self, hostname=None, ip=None, uuid=None):
srv = self._match_srv(hostname, ip, uuid)
return srv.ip + ':' + srv.http_port
[docs] def hostname(self, ip=None, uuid=None):
srv = self._match_srv(ip=ip, uuid=uuid)
return srv.hostname
[docs] def ip(self, hostname=None, uuid=None):
srv = self._match_srv(hostname=hostname, uuid=uuid)
return srv.ip
[docs] def this_addr_local(self):
return socket.gethostname()
[docs] def this_addr_network(self):
try:
srv = self._match_srv(hostname=socket.gethostname())
except:
return socket.gethostname()
else:
return srv.ip
[docs] def is_this_machine(self, address):
addr = address
if address.startswith('tcp://'):
addr = addr[6:]
parts = addr.split(':')
if len(parts) > 0:
addr = parts[0]
return addr == self.this_addr_network() or addr == self.this_addr_local()
[docs] def update(self, ip, hostname, uuid, rep_port, pub_port, http_port=None):
with self.__lock:
old = self.__servers.get(uuid, None)
new = self.__servers[uuid] = PeerNetworkDescriptor(ip, hostname, uuid,
rep_port, pub_port,
http_port)
changed = old is None
if not changed:
changed = old.ip != new.ip or old.hostname != new.hostname
return changed
[docs] def mass_update(self, server_dict):
with self.__lock:
self.__servers = {}
for uid in server_dict:
self.__servers[uid] = PeerNetworkDescriptor(**server_dict[uid])
[docs] def clean_silent(self):
changed = False
with self.__lock:
check_time = time.time()
for uid in list(self.__servers.keys()):
srv = self.__servers[uid]
if srv.timestamp + self.allowed_silence < check_time:
changed = True
self.logger.warning("obci_server on " + str(srv.ip) + ' ' +
srv.hostname + " is most probably down.")
del self.__servers[uid]
return changed
[docs] def snapshot(self):
snapshot = {}
with self.__lock:
for uid in self.__servers.keys():
snapshot[uid] = self.__servers[uid]._copy()
return snapshot
[docs] def dict_snapshot(self):
snapshot = {}
with self.__lock:
for uid in self.__servers.keys():
snapshot[uid] = self.__servers[uid].as_dict()
return snapshot
[docs] def copy(self):
new = DNS()
new.allowed_silence = self.allowed_silence
new.mass_update(self.dict_snapshot())
return new
[docs]class PeerNetworkDescriptor(object):
def __init__(self, ip, hostname, uuid, rep_port, pub_port, http_port=None, timestamp=None):
self.ip = ip
self.hostname = hostname
self.uuid = uuid
self.rep_port = rep_port
self.pub_port = pub_port
self.http_port = http_port
self.timestamp = timestamp if timestamp is not None else time.time()
def __str__(self):
return str(self.as_dict())
def _copy(self):
desc = PeerNetworkDescriptor(self.ip, self.hostname, self.uuid,
self.rep_port, self.pub_port, self.http_port)
desc.timestamp = self.timestamp
return desc
[docs] def as_dict(self):
# dumb
return dict(vars(self))
if __name__ == '__main__':
# print ext_ip()
print(__file__)
print(INSTALL_DIR)