mirror of
https://github.com/azaion/annotations.git
synced 2026-04-23 05:16:31 +00:00
add pxd headers for correct work
fixes definitions can run until API call
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
# cython: language_level=3
|
||||
import json
|
||||
import socket
|
||||
import struct
|
||||
@@ -8,9 +7,10 @@ import msgpack
|
||||
from msgpack import packb
|
||||
from rstream import Producer, Consumer, AMQPMessage, ConsumerOffsetSpecification, OffsetType, MessageContext
|
||||
|
||||
import constants
|
||||
from api_client import ApiClient
|
||||
from processor_command import FileCommand, ProcessorType
|
||||
cimport constants
|
||||
from api_client cimport ApiClient
|
||||
from processor_command cimport FileCommand, ProcessorType
|
||||
from annotation cimport Annotation
|
||||
|
||||
cdef class QueueConfig:
|
||||
cdef str host
|
||||
@@ -35,15 +35,15 @@ cdef class QueueConfig:
|
||||
cdef class SocketHandler:
|
||||
"""Handles socket communication with size-prefixed messages."""
|
||||
|
||||
def __init__(self, on_message):
|
||||
def __init__(self, object on_message):
|
||||
self.on_message = on_message
|
||||
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._socket.connect((constants.SOCKET_HOST, constants.SOCKET_PORT))
|
||||
self._socket.bind((constants.SOCKET_HOST, constants.SOCKET_PORT))
|
||||
self._socket.listen(1)
|
||||
|
||||
cdef start(self):
|
||||
threading.Thread(target=self.start, daemon=True).start()
|
||||
threading.Thread(target=self.start_inner, daemon=True).start()
|
||||
|
||||
cdef start_inner(self):
|
||||
while True:
|
||||
@@ -62,53 +62,50 @@ cdef class SocketHandler:
|
||||
cmd = FileCommand.from_msgpack(data, ProcessorType.SOCKET)
|
||||
self.on_message(cmd)
|
||||
|
||||
async def send(self, object message):
|
||||
cdef send(self, list[Annotation] message):
|
||||
data = msgpack.packb(message)
|
||||
size_prefix = len(data).to_bytes(4, 'big')
|
||||
self._connection.sendall(size_prefix + data)
|
||||
|
||||
def close(self):
|
||||
cdef close(self):
|
||||
if self._socket:
|
||||
self._socket.close()
|
||||
self._socket = None
|
||||
|
||||
cdef class RabbitHandler:
|
||||
cdef str hardware_hash
|
||||
|
||||
def __init__(self, config_filename, on_message):
|
||||
def __init__(self, ApiClient api_client, object on_message):
|
||||
self.on_message = on_message
|
||||
cdef str config_str = ApiClient().load_file(constants.QUEUE_CONFIG_FILENAME).decode(encoding='utf-8')
|
||||
self.queue_config = QueueConfig.from_json(config_str)
|
||||
cdef str config_str = api_client.load_file(constants.QUEUE_CONFIG_FILENAME).decode(encoding='utf-8')
|
||||
queue_config = QueueConfig.from_json(config_str)
|
||||
self.annotation_producer = Producer(
|
||||
host=<str>self.queue_config.host,
|
||||
port=self.queue_config.port,
|
||||
username=<str>self.queue_config.producer_user,
|
||||
password=<str>self.queue_config.producer_pw
|
||||
host=<str>queue_config.host,
|
||||
port=queue_config.port,
|
||||
username=<str>queue_config.producer_user,
|
||||
password=<str>queue_config.producer_pw
|
||||
)
|
||||
self.command_consumer = Consumer(
|
||||
host=<str>self.queue_config.host,
|
||||
port=self.queue_config.port,
|
||||
username=<str>self.queue_config.consumer_user,
|
||||
password=<str>self.queue_config.consumer_pw
|
||||
host=<str>queue_config.host,
|
||||
port=queue_config.port,
|
||||
username=<str>queue_config.consumer_user,
|
||||
password=<str>queue_config.consumer_pw
|
||||
)
|
||||
|
||||
cdef start(self):
|
||||
self.command_consumer.start()
|
||||
self.command_consumer.subscribe(stream=constants.COMMANDS_QUEUE, callback=self.on_message_inner,
|
||||
self.command_consumer.subscribe(stream=<str>constants.COMMANDS_QUEUE, callback=self.on_message_inner,
|
||||
offset_specification=ConsumerOffsetSpecification(OffsetType.FIRST, None)) # put real offset
|
||||
|
||||
|
||||
cdef on_message_inner(self, message: AMQPMessage, message_context: MessageContext):
|
||||
def on_message_inner(self, message: AMQPMessage, message_context: MessageContext):
|
||||
cdef bytes body = message.body
|
||||
cmd = FileCommand.from_msgpack(body, ProcessorType.RABBIT)
|
||||
self.on_message(cmd)
|
||||
|
||||
cpdef send(self, object message):
|
||||
cdef send(self, object message):
|
||||
packed_message = AMQPMessage(body=packb(message))
|
||||
self.annotation_producer.send(constants.ANNOTATIONS_QUEUE, packed_message)
|
||||
self.annotation_producer.send(<str>constants.ANNOTATIONS_QUEUE, packed_message)
|
||||
|
||||
async def close(self):
|
||||
cdef close(self):
|
||||
if self.annotation_producer:
|
||||
await self.annotation_producer.close()
|
||||
self.annotation_producer.close()
|
||||
if self.command_consumer:
|
||||
await self.command_consumer.close()
|
||||
self.command_consumer.close()
|
||||
Reference in New Issue
Block a user