#!/usr/bin/python
# -*- coding: utf-8 -*-
import json
import zmq
BASIC_MSG = dict(type='', sender='', receiver='', sender_ip='')
# BasicMessage = namedtuple('BasicMessage', 'type sender receiver')
# _basic_message = BasicMessage(type='basic_message', sender='', receiver='')
common_templates = {
"rq_ok": dict(status='', request='', params=''),
"rq_error": dict(err_code='', request='', details=''),
"kill": dict(),
"heartbeat": dict(),
"ping": dict(),
"pong": dict(),
"pub_addr_rq": dict(),
"pub_addr": dict(pub_addresses='', request='')
}
common_errors = ["invalid_msg_format",
"incomplete_message",
"unsupported_msg_type",
"no_pub_sock"]
[docs]class PollingObject(object):
def __init__(self):
self.poller = zmq.Poller()
[docs] def poll_recv(self, socket, timeout):
self.poller.register(socket, zmq.POLLIN)
socks = None
fail_det = None
try:
socks = dict(self.poller.poll(timeout=timeout))
except zmq.ZMQError as e:
fail_det = "obci_client: zmq.poll(): " + e.strerror
finally:
self.poller.unregister(socket)
if socks is None:
return None, fail_det
if socket in socks and socks[socket] == zmq.POLLIN:
return recv_msg(socket), None
else:
return None, "No data"
[docs]def send_msg(sock, message, flags=0):
assert isinstance(message, bytes)
return sock.send(message, flags=flags)
[docs]def recv_msg(sock, flags=0):
return sock.recv(flags=flags)
[docs]class LauncherMessage(object):
[docs] def SerializeToString(self):
return json.dumps(vars(self)).encode()
def __repr__(self):
return str(self.dict())
[docs] def raw(self):
return json.dumps(vars(self), sort_keys=True, indent=4).encode()
[docs] def ParseFromString(self, bytes):
message = json.loads(bytes.decode())
if not isinstance(message, dict): # TODO more general keyed collection?
raise OBCIMessageError()
for key in message:
setattr(self, key, message[key])
[docs] def keys(self):
return list(vars(self).keys())
[docs] def dict(self):
d = {}
for attr, val in vars(self).items():
d[attr] = val
return d
[docs]class OBCIMessageError(Exception):
pass