Source code for obci.control.launcher.twisted_tcp_handling
#!/usr/bin/python
# -*- coding: utf-8 -*-
from twisted.protocols.basic import NetstringReceiver
from twisted.internet.protocol import Factory
from twisted.internet import reactor
import zmq
import socket
import threading
from obci.control.common.message import OBCIMessageTool, send_msg, PollingObject
from obci.control.launcher.launcher_messages import message_templates
from obci.control.common.obci_control_settings import PORT_RANGE
[docs]class OBCIProxy(NetstringReceiver):
[docs] def stringReceived(self, string):
req_sock = self.factory.ctx.socket(zmq.REQ)
req_sock.connect(self.factory.zmq_rep_addr)
try:
req = string
assert isinstance(req, bytes)
print("twisted got:", req)
bad = False
try:
parsed = self.factory.mtool.unpack_msg(req)
except ValueError:
bad = True
if not bad:
if parsed.type in self.factory.long_rqs:
sock, port = self.factory.long_rqs[parsed.type]
pull_addr = 'tcp://' + socket.gethostname() + ':' + str(port)
parsed.client_push_address = pull_addr
send_msg(req_sock, parsed.SerializeToString())
else:
send_msg(req_sock, req)
pl = PollingObject()
msg, det = pl.poll_recv(req_sock, timeout=5000)
finally:
req_sock.close()
if not msg:
msg = self.factory.mtool.fill_msg("rq_error", details=det)
if not bad:
if parsed.type in self.factory.long_rqs:
sock, port = self.factory.long_rqs[parsed.type]
msg, det = pl.poll_recv(sock, timeout=20000)
if not msg:
msg = self.factory.mtool.fill_msg("rq_error", details=det)
return
self.sendString(msg)
reactor.callFromThread(self.sendString, msg)
[docs]class OBCIProxyFactory(Factory):
protocol = OBCIProxy
def __init__(self, address, zmq_ctx, zmq_rep_addr):
self.srv_address = address
self.ctx = zmq_ctx
self.zmq_rep_addr = zmq_rep_addr.replace("*", socket.gethostname())
self.long_rqs = {}
for msgtype in ["find_eeg_experiments",
"find_eeg_amplifiers",
# "join_experiment",
"start_eeg_signal"]:
self.long_rqs[msgtype] = self._make_pull_sock()
self.mtool = OBCIMessageTool(message_templates)
def _make_pull_sock(self):
sock = self.ctx.socket(zmq.PULL)
port = sock.bind_to_random_port('tcp://*',
min_port=PORT_RANGE[0],
max_port=PORT_RANGE[1], max_tries=500)
return (sock, port)
[docs]def run_twisted_server(address, zmq_ctx, zmq_rep_addr):
fact = OBCIProxyFactory(address, zmq_ctx, zmq_rep_addr)
port = reactor.listenTCP(address[1], fact)
port = port.getHost().port
fact.srv_address = (address[0], port)
thr = threading.Thread(target=reactor.run, args=[False])
thr.daemon = True
thr.start()
print("Twisted: listening on port", port)
return thr, port
if __name__ == '__main__':
run_twisted_server(('0.0.0.0', 12013), zmq.Context(), 'tcp://127.0.0.1:54654')
print("twisted: server started.")