import time import zmq from threading import Thread, Event from remote_command cimport RemoteCommand cimport constants cdef class RemoteCommandHandler: def __init__(self, object on_command): self._on_command = on_command self._context = zmq.Context.instance() self._shutdown_event = Event() self._router = self._context.socket(zmq.ROUTER) self._router.setsockopt(zmq.LINGER, 0) self._router.bind(f'tcp://*:{constants.ZMQ_PORT}') self._dealer = self._context.socket(zmq.DEALER) self._dealer.setsockopt(zmq.LINGER, 0) self._dealer.bind("inproc://backend") self._proxy_thread = Thread(target=self._proxy_loop, daemon=True) self._workers = [] for _ in range(4): # 4 worker threads worker = Thread(target=self._worker_loop, daemon=True) self._workers.append(worker) cdef start(self): self._proxy_thread.start() for worker in self._workers: worker.start() cdef _proxy_loop(self): zmq.proxy(self._router, self._dealer) cdef _worker_loop(self): worker_socket = self._context.socket(zmq.DEALER) worker_socket.setsockopt(zmq.LINGER, 0) worker_socket.connect("inproc://backend") poller = zmq.Poller() poller.register(worker_socket, zmq.POLLIN) print('started receiver loop...') while not self._shutdown_event.is_set(): try: socks = dict(poller.poll(500)) if worker_socket in socks: client_id, message = worker_socket.recv_multipart() cmd = RemoteCommand.from_msgpack( message) cmd.client_id = client_id print(f'Received [{cmd}] from the client {client_id}') self._on_command(cmd) except Exception as e: print(f"Worker error: {e}") import traceback traceback.print_exc() cdef send(self, bytes client_id, bytes data): with self._context.socket(zmq.DEALER) as socket: socket.connect("inproc://backend") socket.send_multipart([client_id, data]) print(f'{len(data)} bytes was sent to client {client_id}') cdef stop(self): self._shutdown_event.set() time.sleep(0.5) self._router.close() self._dealer.close() self._context.term()