Files
annotations/Azaion.Inference/remote_command_handler.pyx
T
Alex Bezdieniezhnykh 472ed6533e fix hardware service
2025-05-02 13:25:33 +03:00

93 lines
3.2 KiB
Cython

import time
import zmq
from threading import Thread, Event
from remote_command cimport RemoteCommand
cimport constants
import yaml
cdef class RemoteCommandHandler:
def __init__(self, int zmq_port, object on_command):
self._on_command = on_command
self._context = zmq.Context.instance()
self._router = self._context.socket(zmq.ROUTER)
self._router.setsockopt(zmq.LINGER, 0)
self._router.bind(f'tcp://*:{zmq_port}')
self._dealer = self._context.socket(zmq.DEALER)
self._dealer.setsockopt(zmq.LINGER, 0)
self._dealer.bind("inproc://backend")
self._control = self._context.socket(zmq.PAIR)
self._control.bind("inproc://control")
self._shutdown_event = Event()
self._proxy_thread = Thread(target=self._proxy_loop, daemon=True)
self._workers = []
for _ in range(4): # 4 worker threads
worker = Thread(target=self._worker_loop, daemon=True)
self._workers.append(worker)
print(f'Listening to commands on port {zmq_port}...')
cdef start(self):
self._proxy_thread.start()
for worker in self._workers:
worker.start()
cdef _proxy_loop(self):
try:
zmq.proxy_steerable(self._router, self._dealer, control=self._control)
except zmq.error.ZMQError as e:
if self._shutdown_event.is_set():
print("Shutdown, exit proxy loop.")
else:
raise
cdef _worker_loop(self):
worker_socket = self._context.socket(zmq.DEALER)
worker_socket.setsockopt(zmq.LINGER, 0)
worker_socket.connect("inproc://backend")
poller = zmq.Poller()
poller.register(worker_socket, zmq.POLLIN)
try:
while not self._shutdown_event.is_set():
try:
socks = dict(poller.poll(500))
if worker_socket in socks:
client_id, message = worker_socket.recv_multipart()
cmd = RemoteCommand.from_msgpack(<bytes> message)
cmd.client_id = client_id
constants.log(<str>f'{cmd}', client_id)
self._on_command(cmd)
except Exception as e:
if not self._shutdown_event.is_set():
constants.log(f"Worker error: {e}")
import traceback
traceback.print_exc()
finally:
worker_socket.close()
cdef send(self, bytes client_id, bytes data):
with self._context.socket(zmq.DEALER) as socket:
socket.connect("inproc://backend")
socket.send_multipart([client_id, data])
# constants.log(<str>f'Sent {len(data)} bytes.', client_id)
cdef stop(self):
self._shutdown_event.set()
try:
self._control.send(b"TERMINATE", flags=zmq.DONTWAIT)
except zmq.error.ZMQError:
pass
self._router.close(linger=0)
self._dealer.close(linger=0)
self._control.close(linger=0)
self._proxy_thread.join(timeout=2)
while any(w.is_alive() for w in self._workers):
time.sleep(0.1)
self._context.term()