mirror of
https://github.com/azaion/annotations.git
synced 2026-04-22 21:56:31 +00:00
067f02cc63
rework AIAvailabilityStatus events to mediatr
37 lines
1.1 KiB
Cython
37 lines
1.1 KiB
Cython
cimport constants_inf
|
|
import msgpack
|
|
|
|
AIStatus2Text = {
|
|
AIAvailabilityEnum.NONE: "None",
|
|
AIAvailabilityEnum.DOWNLOADING: "Downloading",
|
|
AIAvailabilityEnum.CONVERTING: "Converting",
|
|
AIAvailabilityEnum.UPLOADING: "Uploading",
|
|
AIAvailabilityEnum.ENABLED: "Enabled",
|
|
AIAvailabilityEnum.ERROR: "Error",
|
|
}
|
|
|
|
cdef class AIAvailabilityStatus:
|
|
def __init__(self):
|
|
self.status = AIAvailabilityEnum.NONE
|
|
self.error_message = None
|
|
|
|
def __str__(self):
|
|
status_text = AIStatus2Text.get(self.status, "Unknown")
|
|
error_text = self.error_message if self.error_message else ""
|
|
return f"{status_text} {error_text}"
|
|
|
|
cdef bytes serialize(self):
|
|
return msgpack.packb({
|
|
"s": self.status,
|
|
"m": self.error_message
|
|
})
|
|
|
|
cdef set_status(self, AIAvailabilityEnum status, str error_message=None):
|
|
self.status = status
|
|
self.error_message = error_message
|
|
if error_message is not None:
|
|
constants_inf.logerror(<str>error_message)
|
|
else:
|
|
constants_inf.log(<str>str(self))
|
|
|