diff --git a/Azaion.Annotator/AnnotatorEventHandler.cs b/Azaion.Annotator/AnnotatorEventHandler.cs index 86944c4..eea520a 100644 --- a/Azaion.Annotator/AnnotatorEventHandler.cs +++ b/Azaion.Annotator/AnnotatorEventHandler.cs @@ -327,7 +327,7 @@ public class AnnotatorEventHandler( foreach (var res in results) { var time = TimeSpan.Zero; - var annotationName = $"{formState.MediaName}{Constants.SPLIT_SUFFIX}{res.Tile.Width}{res.Tile.Left:0000}_{res.Tile.Top:0000}!".ToTimeName(time); + var annotationName = $"{formState.MediaName}{Constants.SPLIT_SUFFIX}{res.Tile.Width}_{res.Tile.Left:0000}_{res.Tile.Top:0000}!".ToTimeName(time); var tileImgPath = Path.Combine(dirConfig.Value.ImagesDirectory, $"{annotationName}{Constants.JPG_EXT}"); var bitmap = new CroppedBitmap(source, new Int32Rect((int)res.Tile.Left, (int)res.Tile.Top, (int)res.Tile.Width, (int)res.Tile.Height)); diff --git a/Azaion.Common/DTO/RemoteCommand.cs b/Azaion.Common/DTO/RemoteCommand.cs index 1dc9508..edd9cc6 100644 --- a/Azaion.Common/DTO/RemoteCommand.cs +++ b/Azaion.Common/DTO/RemoteCommand.cs @@ -48,6 +48,8 @@ public enum CommandType DataBytes = 25, Inference = 30, InferenceData = 35, + InferenceStatus = 37, + InferenceDone = 38, StopInference = 40, AIAvailabilityCheck = 80, AIAvailabilityResult = 85, diff --git a/Azaion.Common/Services/Inference/InferenceClient.cs b/Azaion.Common/Services/Inference/InferenceClient.cs index 2e612b9..114c4f2 100644 --- a/Azaion.Common/Services/Inference/InferenceClient.cs +++ b/Azaion.Common/Services/Inference/InferenceClient.cs @@ -1,5 +1,6 @@ using System.Diagnostics; using System.Text; +using Azaion.Common.Database; using Azaion.Common.DTO; using MediatR; using MessagePack; @@ -49,7 +50,7 @@ public class InferenceClient : IInferenceClient Arguments = $"-p {_inferenceClientConfig.ZeroMqPort} -lp {_loaderClientConfig.ZeroMqPort} -a {_inferenceClientConfig.ApiUrl}", CreateNoWindow = true }; - process.Start(); + //process.Start(); } catch (Exception e) { @@ -75,7 +76,15 @@ public class InferenceClient : IInferenceClient switch (remoteCommand.CommandType) { case CommandType.InferenceData: - await _mediator.Publish(new InferenceDataEvent(remoteCommand), ct); + var annotationImage = MessagePackSerializer.Deserialize(remoteCommand.Data, cancellationToken: ct); + await _mediator.Publish(new InferenceDataEvent(annotationImage), ct); + break; + case CommandType.InferenceStatus: + var statusEvent = MessagePackSerializer.Deserialize(remoteCommand.Data, cancellationToken: ct); + await _mediator.Publish(statusEvent, ct); + break; + case CommandType.InferenceDone: + await _mediator.Publish(new InferenceDoneEvent(), ct); break; case CommandType.AIAvailabilityResult: var aiAvailabilityStatus = MessagePackSerializer.Deserialize(remoteCommand.Data, cancellationToken: ct); diff --git a/Azaion.Common/Services/Inference/InferenceServiceEventHandler.cs b/Azaion.Common/Services/Inference/InferenceServiceEventHandler.cs index 092edfb..5364285 100644 --- a/Azaion.Common/Services/Inference/InferenceServiceEventHandler.cs +++ b/Azaion.Common/Services/Inference/InferenceServiceEventHandler.cs @@ -12,32 +12,23 @@ public class InferenceServiceEventHandler(IInferenceService inferenceService, IMediator mediator, ILogger logger) : INotificationHandler, - INotificationHandler + INotificationHandler, + INotificationHandler { + public async Task Handle(InferenceDataEvent e, CancellationToken ct) { - try - { - if (e.Command.Message == "DONE") - { - await inferenceService.InferenceCancelTokenSource.CancelAsync(); - return; - } - - var annImage = MessagePackSerializer.Deserialize(e.Command.Data, cancellationToken: ct); - var annotation = await annotationService.SaveAnnotation(annImage, ct); - await mediator.Publish(new AnnotationAddedEvent(annotation), ct); - } - catch (Exception ex) - { - logger.LogError(ex, ex.Message); - } + var annotation = await annotationService.SaveAnnotation(e.AnnotationImage, ct); + await mediator.Publish(new AnnotationAddedEvent(annotation), ct); } - public async Task Handle(AIAvailabilityStatusEvent e, CancellationToken ct) + public async Task Handle(InferenceStatusEvent e, CancellationToken ct) { + await mediator.Publish(new SetStatusTextEvent($"{e.MediaName}: {e.DetectionsCount} detections"), ct); + } - e.Status = AIAvailabilityEnum.Enabled; - + public async Task Handle(InferenceDoneEvent notification, CancellationToken cancellationToken) + { + await inferenceService.InferenceCancelTokenSource.CancelAsync(); } } \ No newline at end of file diff --git a/Azaion.Common/Services/Inference/InferenceServiceEvents.cs b/Azaion.Common/Services/Inference/InferenceServiceEvents.cs index 94aff5c..abec0fc 100644 --- a/Azaion.Common/Services/Inference/InferenceServiceEvents.cs +++ b/Azaion.Common/Services/Inference/InferenceServiceEvents.cs @@ -1,9 +1,22 @@ -using Azaion.Common.DTO; +using Azaion.Common.Database; using MediatR; +using MessagePack; namespace Azaion.Common.Services.Inference; -public class InferenceDataEvent(RemoteCommand command) : INotification +public class InferenceDataEvent(AnnotationImage annotationImage) : INotification { - public RemoteCommand Command { get; set; } = command; + public AnnotationImage AnnotationImage { get; set; } = annotationImage; } + +[MessagePackObject] +public class InferenceStatusEvent : INotification +{ + [Key("mn")] + public string MediaName { get; set; } + + [Key("dc")] + public int DetectionsCount { get; set; } +} + +public class InferenceDoneEvent : INotification; \ No newline at end of file diff --git a/Azaion.Common/Services/TileProcessor.cs b/Azaion.Common/Services/TileProcessor.cs index bd23383..393809e 100644 --- a/Azaion.Common/Services/TileProcessor.cs +++ b/Azaion.Common/Services/TileProcessor.cs @@ -65,7 +65,7 @@ public static class TileProcessor tile.Left = Math.Max(0, Math.Min(originalSize.Width - maxSize, centerX - tile.Width / 2.0)); tile.Top = Math.Max(0, Math.Min(originalSize.Height - maxSize, centerY - tile.Height / 2.0)); - return new TileResult(tile, selectedDetections); + return new TileResult( tile, selectedDetections); } } \ No newline at end of file diff --git a/Azaion.Inference/inference.pxd b/Azaion.Inference/inference.pxd index 76b3150..9515b42 100644 --- a/Azaion.Inference/inference.pxd +++ b/Azaion.Inference/inference.pxd @@ -12,6 +12,7 @@ cdef class Inference: cdef RemoteCommandHandler remote_handler cdef Annotation _previous_annotation cdef dict[str, list(Detection)] _tile_detections + cdef dict[str, int] detection_counts cdef AIRecognitionConfig ai_config cdef bint stop_signal cdef public AIAvailabilityStatus ai_availability_status diff --git a/Azaion.Inference/inference.pyx b/Azaion.Inference/inference.pyx index 056a4c7..15cc569 100644 --- a/Azaion.Inference/inference.pyx +++ b/Azaion.Inference/inference.pyx @@ -1,13 +1,12 @@ import mimetypes -import time from pathlib import Path import cv2 +import msgpack import numpy as np cimport constants_inf from ai_availability_status cimport AIAvailabilityEnum, AIAvailabilityStatus -from remote_command_inf cimport RemoteCommand from annotation cimport Detection, Annotation from ai_config cimport AIRecognitionConfig import pynvml @@ -60,6 +59,7 @@ cdef class Inference: self.model_input = None self.model_width = 0 self.model_height = 0 + self.detection_counts = {} self.engine = None self.is_building_engine = False self.ai_availability_status = AIAvailabilityStatus() @@ -233,14 +233,17 @@ cdef class Inference: if self.engine is None: constants_inf.log( "AI engine not available. Conversion may be in progress. Skipping inference.") response = RemoteCommand(CommandType.AI_AVAILABILITY_RESULT, self.ai_availability_status.serialize()) - self.remote_handler.send(cmd.client_id, response.serialize()) + self.remote_handler.send(cmd.client_id, response) return - for m in ai_config.paths: - if self.is_video(m): - videos.append(m) + self.detection_counts = {} + for p in ai_config.paths: + media_name = Path(p).stem.replace(" ", "") + self.detection_counts[media_name] = 0 + if self.is_video(p): + videos.append(p) else: - images.append(m) + images.append(p) # images first, it's faster if len(images) > 0: constants_inf.log(f'run inference on {" ".join(images)}...') @@ -295,7 +298,7 @@ cdef class Inference: cdef on_annotation(self, RemoteCommand cmd, Annotation annotation): cdef RemoteCommand response = RemoteCommand(CommandType.INFERENCE_DATA, annotation.serialize()) - self.remote_handler.send(cmd.client_id, response.serialize()) + self.remote_handler.send(cmd.client_id, response) cdef _process_images(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list[str] image_paths): cdef list frame_data @@ -345,14 +348,16 @@ cdef class Inference: y = img_h - tile_size tile = frame[y:y_end, x:x_end] - name = f'{original_media_name}{constants_inf.SPLIT_SUFFIX}{tile_size:04d}{x:04d}_{y:04d}!_000000' + name = f'{original_media_name}{constants_inf.SPLIT_SUFFIX}{tile_size:04d}_{x:04d}_{y:04d}!_000000' results.append((tile, original_media_name, name)) return results cdef _process_images_inner(self, RemoteCommand cmd, AIRecognitionConfig ai_config, list frame_data): cdef list frames, original_media_names, names cdef Annotation annotation + cdef int i frames, original_media_names, names = map(list, zip(*frame_data)) + input_blob = self.preprocess(frames) outputs = self.engine.run(input_blob) @@ -360,10 +365,22 @@ cdef class Inference: for i in range(len(list_detections)): annotation = Annotation(names[i], original_media_names[i], 0, list_detections[i]) if self.is_valid_image_annotation(annotation): + constants_inf.log( f'Detected {annotation}') _, image = cv2.imencode('.jpg', frames[i]) annotation.image = image.tobytes() self.on_annotation(cmd, annotation) + self.detection_counts[original_media_names[i]] = self.detection_counts.get(original_media_names[i], 0) + 1 + # Send detecting status for each original media name + for media_name in self.detection_counts.keys(): + try: + status = { + "mn": media_name, + "dc": self.detection_counts[media_name] + } + self.remote_handler.send(cmd.client_id, RemoteCommand(CommandType.INFERENCE_STATUS, msgpack.packb(status))) + except Exception: + pass cdef stop(self): self.stop_signal = True diff --git a/Azaion.Inference/loader_client.pyx b/Azaion.Inference/loader_client.pyx index 4b6eec7..f53cb36 100644 --- a/Azaion.Inference/loader_client.pyx +++ b/Azaion.Inference/loader_client.pyx @@ -19,7 +19,7 @@ cdef class LoaderClient: cdef load_big_small_resource(self, str filename, str directory): cdef FileData file_data = FileData(folder=directory, filename=filename) - cdef RemoteCommand response = self._send_receive_command(RemoteCommand(CommandType.LOAD_BIG_SMALL, data=file_data.serialize())) + cdef RemoteCommand response = self._send_receive_command(RemoteCommand(CommandType.LOAD_BIG_SMALL, data=file_data.serialize())) if response.command_type == CommandType.DATA_BYTES: return LoadResult(None, response.data) elif response.command_type == CommandType.ERROR: diff --git a/Azaion.Inference/main_inference.pyx b/Azaion.Inference/main_inference.pyx index 766d90b..74eb834 100644 --- a/Azaion.Inference/main_inference.pyx +++ b/Azaion.Inference/main_inference.pyx @@ -4,7 +4,6 @@ from queue import Queue cimport constants_inf from threading import Thread -from annotation cimport Annotation from inference cimport Inference from loader_client cimport LoaderClient from remote_command_inf cimport RemoteCommand, CommandType @@ -32,8 +31,7 @@ cdef class CommandProcessor: try: command = self.inference_queue.get(timeout=0.5) self.inference.run_inference(command) - end_inference_command = RemoteCommand(CommandType.INFERENCE_DATA, None, 'DONE') - self.remote_handler.send(command.client_id, end_inference_command.serialize()) + self.remote_handler.send(command.client_id, RemoteCommand(CommandType.INFERENCE_DONE)) except queue.Empty: continue except Exception as e: @@ -46,7 +44,7 @@ cdef class CommandProcessor: self.inference_queue.put(command) elif command.command_type == CommandType.AI_AVAILABILITY_CHECK: status = self.inference.ai_availability_status.serialize() - self.remote_handler.send(command.client_id, RemoteCommand(CommandType.AI_AVAILABILITY_RESULT, status).serialize()) + self.remote_handler.send(command.client_id, RemoteCommand(CommandType.AI_AVAILABILITY_RESULT, status)) elif command.command_type == CommandType.STOP_INFERENCE: self.inference.stop() elif command.command_type == CommandType.EXIT: diff --git a/Azaion.Inference/remote_command_handler_inf.pxd b/Azaion.Inference/remote_command_handler_inf.pxd index b4aaba9..12eb568 100644 --- a/Azaion.Inference/remote_command_handler_inf.pxd +++ b/Azaion.Inference/remote_command_handler_inf.pxd @@ -1,3 +1,5 @@ +from remote_command_inf cimport RemoteCommand + cdef class RemoteCommandHandler: cdef object _context cdef object _router @@ -12,5 +14,5 @@ cdef class RemoteCommandHandler: cdef start(self) cdef _proxy_loop(self) cdef _worker_loop(self) - cdef send(self, bytes client_id, bytes data) + cdef send(self, bytes client_id, RemoteCommand command) cdef stop(self) diff --git a/Azaion.Inference/remote_command_handler_inf.pyx b/Azaion.Inference/remote_command_handler_inf.pyx index 9553a82..ecdd0d6 100644 --- a/Azaion.Inference/remote_command_handler_inf.pyx +++ b/Azaion.Inference/remote_command_handler_inf.pyx @@ -27,7 +27,7 @@ cdef class RemoteCommandHandler: for _ in range(4): # 4 worker threads worker = Thread(target=self._worker_loop, daemon=True) self._workers.append(worker) - constants_inf.log(f'Listening to commands on port {zmq_port}...') + constants_inf.log(f'Listening to commands on port {zmq_port}...') cdef start(self): self._proxy_thread.start() @@ -39,7 +39,7 @@ cdef class RemoteCommandHandler: zmq.proxy_steerable(self._router, self._dealer, control=self._control) except zmq.error.ZMQError as e: if self._shutdown_event.is_set(): - constants_inf.log("Shutdown, exit proxy loop.") + constants_inf.log('Shutdown, exit proxy loop') else: raise @@ -58,17 +58,19 @@ cdef class RemoteCommandHandler: client_id, message = worker_socket.recv_multipart() cmd = RemoteCommand.from_msgpack( message) cmd.client_id = client_id - constants_inf.log(str(cmd)) + constants_inf.log(f'<- {str(cmd)}') self._on_command(cmd) except Exception as e: if not self._shutdown_event.is_set(): - constants_inf.log(f"Worker error: {e}") + constants_inf.log(f'Worker error: {e}') import traceback traceback.print_exc() finally: worker_socket.close() - cdef send(self, bytes client_id, bytes data): + cdef send(self, bytes client_id, RemoteCommand command): + constants_inf.log( f'-> {str(command)}') + cdef bytes data = command.serialize() self._router.send_multipart([client_id, data]) # with self._context.socket(zmq.DEALER) as socket: diff --git a/Azaion.Inference/remote_command_inf.pxd b/Azaion.Inference/remote_command_inf.pxd index f9d4dd0..5880076 100644 --- a/Azaion.Inference/remote_command_inf.pxd +++ b/Azaion.Inference/remote_command_inf.pxd @@ -10,6 +10,8 @@ cdef enum CommandType: DATA_BYTES = 25 INFERENCE = 30 INFERENCE_DATA = 35 + INFERENCE_STATUS = 37 + INFERENCE_DONE = 38 STOP_INFERENCE = 40 AI_AVAILABILITY_CHECK = 80 AI_AVAILABILITY_RESULT = 85 diff --git a/Azaion.Inference/remote_command_inf.pyx b/Azaion.Inference/remote_command_inf.pyx index ea78496..6631083 100644 --- a/Azaion.Inference/remote_command_inf.pyx +++ b/Azaion.Inference/remote_command_inf.pyx @@ -19,6 +19,8 @@ cdef class RemoteCommand: 25: "DATA_BYTES", 30: "INFERENCE", 35: "INFERENCE_DATA", + 37: "INFERENCE_STATUS", + 38: "INFERENCE_DONE", 40: "STOP_INFERENCE", 80: "AI_AVAILABILITY_CHECK", 85: "AI_AVAILABILITY_RESULT",