mirror of
https://github.com/azaion/annotations.git
synced 2026-04-22 10:16:30 +00:00
fixed console Log
fix same files problem in python different libs correct command logging in command handler
This commit is contained in:
@@ -0,0 +1,93 @@
|
||||
import time
|
||||
import zmq
|
||||
from threading import Thread, Event
|
||||
from remote_command_inf cimport RemoteCommand
|
||||
cimport constants_inf
|
||||
|
||||
cdef class RemoteCommandHandler:
|
||||
def __init__(self, int zmq_port, object on_command):
|
||||
self._on_command = on_command
|
||||
self._context = zmq.Context()
|
||||
|
||||
self._router = self._context.socket(zmq.ROUTER)
|
||||
self._router.setsockopt(zmq.LINGER, 0)
|
||||
self._router.bind(f'tcp://*:{zmq_port}')
|
||||
|
||||
self._dealer = self._context.socket(zmq.DEALER)
|
||||
self._dealer.setsockopt(zmq.LINGER, 0)
|
||||
self._dealer.bind("inproc://backend")
|
||||
|
||||
self._control = self._context.socket(zmq.PAIR)
|
||||
self._control.bind("inproc://control")
|
||||
self._shutdown_event = Event()
|
||||
|
||||
self._proxy_thread = Thread(target=self._proxy_loop, daemon=True)
|
||||
|
||||
self._workers = []
|
||||
for _ in range(4): # 4 worker threads
|
||||
worker = Thread(target=self._worker_loop, daemon=True)
|
||||
self._workers.append(worker)
|
||||
constants_inf.log(f'Listening to commands on port {zmq_port}...')
|
||||
|
||||
cdef start(self):
|
||||
self._proxy_thread.start()
|
||||
for worker in self._workers:
|
||||
worker.start()
|
||||
|
||||
cdef _proxy_loop(self):
|
||||
try:
|
||||
zmq.proxy_steerable(self._router, self._dealer, control=self._control)
|
||||
except zmq.error.ZMQError as e:
|
||||
if self._shutdown_event.is_set():
|
||||
constants_inf.log("Shutdown, exit proxy loop.")
|
||||
else:
|
||||
raise
|
||||
|
||||
cdef _worker_loop(self):
|
||||
worker_socket = self._context.socket(zmq.DEALER)
|
||||
worker_socket.setsockopt(zmq.LINGER, 0)
|
||||
worker_socket.connect("inproc://backend")
|
||||
poller = zmq.Poller()
|
||||
poller.register(worker_socket, zmq.POLLIN)
|
||||
try:
|
||||
|
||||
while not self._shutdown_event.is_set():
|
||||
try:
|
||||
socks = dict(poller.poll(500))
|
||||
if worker_socket in socks:
|
||||
client_id, message = worker_socket.recv_multipart()
|
||||
cmd = RemoteCommand.from_msgpack(<bytes> message)
|
||||
cmd.client_id = client_id
|
||||
constants_inf.log(str(cmd))
|
||||
self._on_command(cmd)
|
||||
except Exception as e:
|
||||
if not self._shutdown_event.is_set():
|
||||
constants_inf.log(f"Worker error: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
worker_socket.close()
|
||||
|
||||
cdef send(self, bytes client_id, bytes data):
|
||||
self._router.send_multipart([client_id, data])
|
||||
|
||||
# with self._context.socket(zmq.DEALER) as socket:
|
||||
# socket.connect("inproc://backend")
|
||||
# socket.send_multipart([client_id, data])
|
||||
# # constants.log(<str>f'Sent {len(data)} bytes.', client_id)
|
||||
|
||||
cdef stop(self):
|
||||
self._shutdown_event.set()
|
||||
try:
|
||||
self._control.send(b"TERMINATE", flags=zmq.DONTWAIT)
|
||||
except zmq.error.ZMQError:
|
||||
pass
|
||||
|
||||
self._router.close(linger=0)
|
||||
self._dealer.close(linger=0)
|
||||
self._control.close(linger=0)
|
||||
|
||||
self._proxy_thread.join(timeout=2)
|
||||
while any(w.is_alive() for w in self._workers):
|
||||
time.sleep(0.1)
|
||||
self._context.term()
|
||||
Reference in New Issue
Block a user