fix split tile size

rework inference events and handling
todo: add Medias table and reflect recognition status there
This commit is contained in:
Oleksandr Bezdieniezhnykh
2025-09-05 16:50:09 +03:00
parent 7d68f7faee
commit 9a16099194
14 changed files with 86 additions and 47 deletions
+1 -1
View File
@@ -327,7 +327,7 @@ public class AnnotatorEventHandler(
foreach (var res in results)
{
var time = TimeSpan.Zero;
var annotationName = $"{formState.MediaName}{Constants.SPLIT_SUFFIX}{res.Tile.Width}{res.Tile.Left:0000}_{res.Tile.Top:0000}!".ToTimeName(time);
var annotationName = $"{formState.MediaName}{Constants.SPLIT_SUFFIX}{res.Tile.Width}_{res.Tile.Left:0000}_{res.Tile.Top:0000}!".ToTimeName(time);
var tileImgPath = Path.Combine(dirConfig.Value.ImagesDirectory, $"{annotationName}{Constants.JPG_EXT}");
var bitmap = new CroppedBitmap(source, new Int32Rect((int)res.Tile.Left, (int)res.Tile.Top, (int)res.Tile.Width, (int)res.Tile.Height));
+2
View File
@@ -48,6 +48,8 @@ public enum CommandType
DataBytes = 25,
Inference = 30,
InferenceData = 35,
InferenceStatus = 37,
InferenceDone = 38,
StopInference = 40,
AIAvailabilityCheck = 80,
AIAvailabilityResult = 85,
@@ -1,5 +1,6 @@
using System.Diagnostics;
using System.Text;
using Azaion.Common.Database;
using Azaion.Common.DTO;
using MediatR;
using MessagePack;
@@ -49,7 +50,7 @@ public class InferenceClient : IInferenceClient
Arguments = $"-p {_inferenceClientConfig.ZeroMqPort} -lp {_loaderClientConfig.ZeroMqPort} -a {_inferenceClientConfig.ApiUrl}",
CreateNoWindow = true
};
process.Start();
//process.Start();
}
catch (Exception e)
{
@@ -75,7 +76,15 @@ public class InferenceClient : IInferenceClient
switch (remoteCommand.CommandType)
{
case CommandType.InferenceData:
await _mediator.Publish(new InferenceDataEvent(remoteCommand), ct);
var annotationImage = MessagePackSerializer.Deserialize<AnnotationImage>(remoteCommand.Data, cancellationToken: ct);
await _mediator.Publish(new InferenceDataEvent(annotationImage), ct);
break;
case CommandType.InferenceStatus:
var statusEvent = MessagePackSerializer.Deserialize<InferenceStatusEvent>(remoteCommand.Data, cancellationToken: ct);
await _mediator.Publish(statusEvent, ct);
break;
case CommandType.InferenceDone:
await _mediator.Publish(new InferenceDoneEvent(), ct);
break;
case CommandType.AIAvailabilityResult:
var aiAvailabilityStatus = MessagePackSerializer.Deserialize<AIAvailabilityStatusEvent>(remoteCommand.Data, cancellationToken: ct);
@@ -12,32 +12,23 @@ public class InferenceServiceEventHandler(IInferenceService inferenceService,
IMediator mediator,
ILogger<InferenceServiceEventHandler> logger) :
INotificationHandler<InferenceDataEvent>,
INotificationHandler<AIAvailabilityStatusEvent>
INotificationHandler<InferenceStatusEvent>,
INotificationHandler<InferenceDoneEvent>
{
public async Task Handle(InferenceDataEvent e, CancellationToken ct)
{
try
{
if (e.Command.Message == "DONE")
{
await inferenceService.InferenceCancelTokenSource.CancelAsync();
return;
}
var annImage = MessagePackSerializer.Deserialize<AnnotationImage>(e.Command.Data, cancellationToken: ct);
var annotation = await annotationService.SaveAnnotation(annImage, ct);
await mediator.Publish(new AnnotationAddedEvent(annotation), ct);
}
catch (Exception ex)
{
logger.LogError(ex, ex.Message);
}
var annotation = await annotationService.SaveAnnotation(e.AnnotationImage, ct);
await mediator.Publish(new AnnotationAddedEvent(annotation), ct);
}
public async Task Handle(AIAvailabilityStatusEvent e, CancellationToken ct)
public async Task Handle(InferenceStatusEvent e, CancellationToken ct)
{
await mediator.Publish(new SetStatusTextEvent($"{e.MediaName}: {e.DetectionsCount} detections"), ct);
}
e.Status = AIAvailabilityEnum.Enabled;
public async Task Handle(InferenceDoneEvent notification, CancellationToken cancellationToken)
{
await inferenceService.InferenceCancelTokenSource.CancelAsync();
}
}
@@ -1,9 +1,22 @@
using Azaion.Common.DTO;
using Azaion.Common.Database;
using MediatR;
using MessagePack;
namespace Azaion.Common.Services.Inference;
public class InferenceDataEvent(RemoteCommand command) : INotification
public class InferenceDataEvent(AnnotationImage annotationImage) : INotification
{
public RemoteCommand Command { get; set; } = command;
public AnnotationImage AnnotationImage { get; set; } = annotationImage;
}
[MessagePackObject]
public class InferenceStatusEvent : INotification
{
[Key("mn")]
public string MediaName { get; set; }
[Key("dc")]
public int DetectionsCount { get; set; }
}
public class InferenceDoneEvent : INotification;
+1 -1
View File
@@ -65,7 +65,7 @@ public static class TileProcessor
tile.Left = Math.Max(0, Math.Min(originalSize.Width - maxSize, centerX - tile.Width / 2.0));
tile.Top = Math.Max(0, Math.Min(originalSize.Height - maxSize, centerY - tile.Height / 2.0));
return new TileResult(tile, selectedDetections);
return new TileResult( tile, selectedDetections);
}
}
+1
View File
@@ -12,6 +12,7 @@ cdef class Inference:
cdef RemoteCommandHandler remote_handler
cdef Annotation _previous_annotation
cdef dict[str, list(Detection)] _tile_detections
cdef dict[str, int] detection_counts
cdef AIRecognitionConfig ai_config
cdef bint stop_signal
cdef public AIAvailabilityStatus ai_availability_status
+26 -9
View File
@@ -1,13 +1,12 @@
import mimetypes
import time
from pathlib import Path
import cv2
import msgpack
import numpy as np
cimport constants_inf
from ai_availability_status cimport AIAvailabilityEnum, AIAvailabilityStatus
from remote_command_inf cimport RemoteCommand
from annotation cimport Detection, Annotation
from ai_config cimport AIRecognitionConfig
import pynvml
@@ -60,6 +59,7 @@ cdef class Inference:
self.model_input = None
self.model_width = 0
self.model_height = 0
self.detection_counts = {}
self.engine = None
self.is_building_engine = False
self.ai_availability_status = AIAvailabilityStatus()
@@ -233,14 +233,17 @@ cdef class Inference:
if self.engine is None:
constants_inf.log(<str> "AI engine not available. Conversion may be in progress. Skipping inference.")
response = RemoteCommand(CommandType.AI_AVAILABILITY_RESULT, self.ai_availability_status.serialize())
self.remote_handler.send(cmd.client_id, response.serialize())
self.remote_handler.send(cmd.client_id, <RemoteCommand>response)
return
for m in ai_config.paths:
if self.is_video(m):
videos.append(m)
self.detection_counts = {}
for p in ai_config.paths:
media_name = Path(<str>p).stem.replace(" ", "")
self.detection_counts[media_name] = 0
if self.is_video(p):
videos.append(p)
else:
images.append(m)
images.append(p)
# images first, it's faster
if len(images) > 0:
constants_inf.log(<str>f'run inference on {" ".join(images)}...')
@@ -295,7 +298,7 @@ cdef class Inference:
cdef on_annotation(self, RemoteCommand cmd, Annotation annotation):
cdef RemoteCommand response = RemoteCommand(CommandType.INFERENCE_DATA, annotation.serialize())
self.remote_handler.send(cmd.client_id, response.serialize())
self.remote_handler.send(cmd.client_id, response)
cdef _process_images(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list[str] image_paths):
cdef list frame_data
@@ -345,14 +348,16 @@ cdef class Inference:
y = img_h - tile_size
tile = frame[y:y_end, x:x_end]
name = f'{original_media_name}{constants_inf.SPLIT_SUFFIX}{tile_size:04d}{x:04d}_{y:04d}!_000000'
name = f'{original_media_name}{constants_inf.SPLIT_SUFFIX}{tile_size:04d}_{x:04d}_{y:04d}!_000000'
results.append((tile, original_media_name, name))
return results
cdef _process_images_inner(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list frame_data):
cdef list frames, original_media_names, names
cdef Annotation annotation
cdef int i
frames, original_media_names, names = map(list, zip(*frame_data))
input_blob = self.preprocess(frames)
outputs = self.engine.run(input_blob)
@@ -360,10 +365,22 @@ cdef class Inference:
for i in range(len(list_detections)):
annotation = Annotation(names[i], original_media_names[i], 0, list_detections[i])
if self.is_valid_image_annotation(annotation):
constants_inf.log(<str> f'Detected {annotation}')
_, image = cv2.imencode('.jpg', frames[i])
annotation.image = image.tobytes()
self.on_annotation(cmd, annotation)
self.detection_counts[original_media_names[i]] = self.detection_counts.get(original_media_names[i], 0) + 1
# Send detecting status for each original media name
for media_name in self.detection_counts.keys():
try:
status = {
"mn": media_name,
"dc": self.detection_counts[media_name]
}
self.remote_handler.send(cmd.client_id, <RemoteCommand>RemoteCommand(CommandType.INFERENCE_STATUS, msgpack.packb(status)))
except Exception:
pass
cdef stop(self):
self.stop_signal = True
+1 -1
View File
@@ -19,7 +19,7 @@ cdef class LoaderClient:
cdef load_big_small_resource(self, str filename, str directory):
cdef FileData file_data = FileData(folder=directory, filename=filename)
cdef RemoteCommand response = self._send_receive_command(RemoteCommand(CommandType.LOAD_BIG_SMALL, data=file_data.serialize()))
cdef RemoteCommand response = self._send_receive_command(<RemoteCommand>RemoteCommand(CommandType.LOAD_BIG_SMALL, data=file_data.serialize()))
if response.command_type == CommandType.DATA_BYTES:
return LoadResult(None, response.data)
elif response.command_type == CommandType.ERROR:
+2 -4
View File
@@ -4,7 +4,6 @@ from queue import Queue
cimport constants_inf
from threading import Thread
from annotation cimport Annotation
from inference cimport Inference
from loader_client cimport LoaderClient
from remote_command_inf cimport RemoteCommand, CommandType
@@ -32,8 +31,7 @@ cdef class CommandProcessor:
try:
command = self.inference_queue.get(timeout=0.5)
self.inference.run_inference(command)
end_inference_command = RemoteCommand(CommandType.INFERENCE_DATA, None, 'DONE')
self.remote_handler.send(command.client_id, end_inference_command.serialize())
self.remote_handler.send(command.client_id, <RemoteCommand>RemoteCommand(CommandType.INFERENCE_DONE))
except queue.Empty:
continue
except Exception as e:
@@ -46,7 +44,7 @@ cdef class CommandProcessor:
self.inference_queue.put(command)
elif command.command_type == CommandType.AI_AVAILABILITY_CHECK:
status = self.inference.ai_availability_status.serialize()
self.remote_handler.send(command.client_id, RemoteCommand(CommandType.AI_AVAILABILITY_RESULT, status).serialize())
self.remote_handler.send(command.client_id, <RemoteCommand>RemoteCommand(CommandType.AI_AVAILABILITY_RESULT, status))
elif command.command_type == CommandType.STOP_INFERENCE:
self.inference.stop()
elif command.command_type == CommandType.EXIT:
@@ -1,3 +1,5 @@
from remote_command_inf cimport RemoteCommand
cdef class RemoteCommandHandler:
cdef object _context
cdef object _router
@@ -12,5 +14,5 @@ cdef class RemoteCommandHandler:
cdef start(self)
cdef _proxy_loop(self)
cdef _worker_loop(self)
cdef send(self, bytes client_id, bytes data)
cdef send(self, bytes client_id, RemoteCommand command)
cdef stop(self)
@@ -27,7 +27,7 @@ cdef class RemoteCommandHandler:
for _ in range(4): # 4 worker threads
worker = Thread(target=self._worker_loop, daemon=True)
self._workers.append(worker)
constants_inf.log(f'Listening to commands on port {zmq_port}...')
constants_inf.log(<str>f'Listening to commands on port {zmq_port}...')
cdef start(self):
self._proxy_thread.start()
@@ -39,7 +39,7 @@ cdef class RemoteCommandHandler:
zmq.proxy_steerable(self._router, self._dealer, control=self._control)
except zmq.error.ZMQError as e:
if self._shutdown_event.is_set():
constants_inf.log("Shutdown, exit proxy loop.")
constants_inf.log(<str>'Shutdown, exit proxy loop')
else:
raise
@@ -58,17 +58,19 @@ cdef class RemoteCommandHandler:
client_id, message = worker_socket.recv_multipart()
cmd = RemoteCommand.from_msgpack(<bytes> message)
cmd.client_id = client_id
constants_inf.log(str(cmd))
constants_inf.log(<str>f'<- {str(cmd)}')
self._on_command(cmd)
except Exception as e:
if not self._shutdown_event.is_set():
constants_inf.log(f"Worker error: {e}")
constants_inf.log(<str>f'Worker error: {e}')
import traceback
traceback.print_exc()
finally:
worker_socket.close()
cdef send(self, bytes client_id, bytes data):
cdef send(self, bytes client_id, RemoteCommand command):
constants_inf.log(<str> f'-> {str(command)}')
cdef bytes data = command.serialize()
self._router.send_multipart([client_id, data])
# with self._context.socket(zmq.DEALER) as socket:
+2
View File
@@ -10,6 +10,8 @@ cdef enum CommandType:
DATA_BYTES = 25
INFERENCE = 30
INFERENCE_DATA = 35
INFERENCE_STATUS = 37
INFERENCE_DONE = 38
STOP_INFERENCE = 40
AI_AVAILABILITY_CHECK = 80
AI_AVAILABILITY_RESULT = 85
+2
View File
@@ -19,6 +19,8 @@ cdef class RemoteCommand:
25: "DATA_BYTES",
30: "INFERENCE",
35: "INFERENCE_DATA",
37: "INFERENCE_STATUS",
38: "INFERENCE_DONE",
40: "STOP_INFERENCE",
80: "AI_AVAILABILITY_CHECK",
85: "AI_AVAILABILITY_RESULT",