import time import zmq from threading import Thread, Event from remote_command cimport RemoteCommand cimport constants cdef class RemoteCommandHandler: def __init__(self, object on_command): self._on_command = on_command self._context = zmq.Context.instance() self._router = self._context.socket(zmq.ROUTER) self._router.setsockopt(zmq.LINGER, 0) self._router.bind(f'tcp://*:{constants.ZMQ_PORT}') self._dealer = self._context.socket(zmq.DEALER) self._dealer.setsockopt(zmq.LINGER, 0) self._dealer.bind("inproc://backend") self._control = self._context.socket(zmq.PAIR) self._control.bind("inproc://control") self._shutdown_event = Event() self._proxy_thread = Thread(target=self._proxy_loop, daemon=True) self._workers = [] for _ in range(4): # 4 worker threads worker = Thread(target=self._worker_loop, daemon=True) self._workers.append(worker) cdef start(self): self._proxy_thread.start() for worker in self._workers: worker.start() cdef _proxy_loop(self): try: zmq.proxy_steerable(self._router, self._dealer, control=self._control) except zmq.error.ZMQError as e: if self._shutdown_event.is_set(): print("Shutdown, exit proxy loop.") else: raise cdef _worker_loop(self): worker_socket = self._context.socket(zmq.DEALER) worker_socket.setsockopt(zmq.LINGER, 0) worker_socket.connect("inproc://backend") poller = zmq.Poller() poller.register(worker_socket, zmq.POLLIN) try: while not self._shutdown_event.is_set(): try: socks = dict(poller.poll(500)) if worker_socket in socks: client_id, message = worker_socket.recv_multipart() cmd = RemoteCommand.from_msgpack( message) cmd.client_id = client_id print(f'Received [{cmd}] from the client {client_id}') self._on_command(cmd) except Exception as e: if not self._shutdown_event.is_set(): print(f"Worker error: {e}") import traceback traceback.print_exc() finally: worker_socket.close() cdef send(self, bytes client_id, bytes data): with self._context.socket(zmq.DEALER) as socket: socket.connect("inproc://backend") socket.send_multipart([client_id, data]) print(f'{len(data)} bytes was sent to client {client_id}') cdef stop(self): self._shutdown_event.set() try: self._control.send(b"TERMINATE", flags=zmq.DONTWAIT) except zmq.error.ZMQError: pass self._router.close(linger=0) self._dealer.close(linger=0) self._control.close(linger=0) self._proxy_thread.join(timeout=2) while any(w.is_alive() for w in self._workers): time.sleep(0.1) self._context.term()