diff --git a/Azaion.Annotator/Annotator.xaml.cs b/Azaion.Annotator/Annotator.xaml.cs index 642e7c6..6407fe4 100644 --- a/Azaion.Annotator/Annotator.xaml.cs +++ b/Azaion.Annotator/Annotator.xaml.cs @@ -14,6 +14,7 @@ using Azaion.Common.DTO.Config; using Azaion.Common.Events; using Azaion.Common.Extensions; using Azaion.Common.Services; +using Azaion.CommonSecurity.Services; using LibVLCSharp.Shared; using MediatR; using Microsoft.WindowsAPICodePack.Dialogs; @@ -40,6 +41,7 @@ public partial class Annotator private readonly AnnotationService _annotationService; private readonly IDbFactory _dbFactory; private readonly IInferenceService _inferenceService; + private readonly IResourceLoader _resourceLoader; private ObservableCollection AnnotationClasses { get; set; } = new(); private bool _suspendLayout; @@ -69,7 +71,8 @@ public partial class Annotator ILogger logger, AnnotationService annotationService, IDbFactory dbFactory, - IInferenceService inferenceService) + IInferenceService inferenceService, + IResourceLoader resourceLoader) { InitializeComponent(); _appConfig = appConfig.Value; @@ -83,6 +86,7 @@ public partial class Annotator _annotationService = annotationService; _dbFactory = dbFactory; _inferenceService = inferenceService; + _resourceLoader = resourceLoader; Loaded += OnLoaded; Closed += OnFormClosed; @@ -397,6 +401,7 @@ public partial class Annotator private void OnFormClosed(object? sender, EventArgs e) { + _resourceLoader.StopPython(); MainCancellationSource.Cancel(); DetectionCancellationSource.Cancel(); _mediaPlayer.Stop(); @@ -494,20 +499,6 @@ public partial class Annotator private (TimeSpan Time, List Detections)? _previousDetection; - private List GetLvFiles() - { - return Dispatcher.Invoke(() => - { - var source = LvFiles.ItemsSource as IEnumerable; - var items = source?.Skip(LvFiles.SelectedIndex) - .Take(Constants.DETECTION_BATCH_SIZE) - .Select(x => x.Path) - .ToList(); - - return items ?? new List(); - }); - } - public void AutoDetect(object sender, RoutedEventArgs e) { if (IsInferenceNow) @@ -529,18 +520,29 @@ public partial class Annotator var ct = DetectionCancellationSource.Token; _ = Task.Run(async () => { - var files = GetLvFiles(); - while (files.Any() && !ct.IsCancellationRequested) + while (!ct.IsCancellationRequested) { + var files = new List(); await Dispatcher.Invoke(async () => { + files = (LvFiles.ItemsSource as IEnumerable)?.Skip(LvFiles.SelectedIndex) + .Take(Constants.DETECTION_BATCH_SIZE) + .Select(x => x.Path) + .ToList(); await _mediator.Publish(new AnnotatorControlEvent(PlaybackControlEnum.Play), ct); await ReloadAnnotations(); }); await _inferenceService.RunInference(files, async annotationImage => await ProcessDetection(annotationImage), ct); - files = GetLvFiles(); - Dispatcher.Invoke(() => LvFiles.Items.Refresh()); + + Dispatcher.Invoke(() => + { + if (LvFiles.SelectedIndex + files.Count >= LvFiles.Items.Count) + DetectionCancellationSource.Cancel(); + + LvFiles.SelectedIndex += files.Count; + LvFiles.Items.Refresh(); + }); } Dispatcher.Invoke(() => { diff --git a/Azaion.CommonSecurity/SecurityConstants.cs b/Azaion.CommonSecurity/SecurityConstants.cs index 8b68ad4..3f41ceb 100644 --- a/Azaion.CommonSecurity/SecurityConstants.cs +++ b/Azaion.CommonSecurity/SecurityConstants.cs @@ -24,5 +24,5 @@ public class SecurityConstants #endregion SocketClient - public static string AzaionInferencePath = "azaion-inference.exe"; + public const string AZAION_INFERENCE_PATH = "azaion-inference.exe"; } \ No newline at end of file diff --git a/Azaion.CommonSecurity/Services/PythonResourceLoader.cs b/Azaion.CommonSecurity/Services/PythonResourceLoader.cs index 5fe22a7..1cd5e99 100644 --- a/Azaion.CommonSecurity/Services/PythonResourceLoader.cs +++ b/Azaion.CommonSecurity/Services/PythonResourceLoader.cs @@ -11,6 +11,7 @@ namespace Azaion.CommonSecurity.Services; public interface IResourceLoader { MemoryStream LoadFileFromPython(string fileName); + void StopPython(); } public interface IAuthProvider @@ -49,11 +50,11 @@ public class PythonResourceLoader : IResourceLoader, IAuthProvider using var process = new Process(); process.StartInfo = new ProcessStartInfo { - FileName = SecurityConstants.AzaionInferencePath, + FileName = SecurityConstants.AZAION_INFERENCE_PATH, Arguments = $"-e {credentials.Email} -p {credentials.Password} -f {apiConfig.ResourcesFolder}", - UseShellExecute = false, - RedirectStandardOutput = true, - RedirectStandardError = true, + //UseShellExecute = false, + //RedirectStandardOutput = true, + // RedirectStandardError = true, //CreateNoWindow = true }; @@ -62,18 +63,10 @@ public class PythonResourceLoader : IResourceLoader, IAuthProvider process.Start(); } - public async Task LoadFileFromApi(CancellationToken cancellationToken = default) + public void StopPython() { - var hardwareService = new HardwareService(); - var hardwareInfo = hardwareService.GetHardware(); - - var encryptedStream = await _api.GetResource("azaion_inference.exe", _credentials.Password, hardwareInfo, cancellationToken); - - var key = Security.MakeEncryptionKey(_credentials.Email, _credentials.Password, hardwareInfo.Hash); - var stream = new MemoryStream(); - await encryptedStream.DecryptTo(stream, key, cancellationToken); - stream.Seek(0, SeekOrigin.Begin); - + _dealer.SendFrame(MessagePackSerializer.Serialize(new RemoteCommand(CommandType.Exit))); + _dealer?.Close(); } public MemoryStream LoadFileFromPython(string fileName) diff --git a/Azaion.Inference/inference.pyx b/Azaion.Inference/inference.pyx index bd94495..7106186 100644 --- a/Azaion.Inference/inference.pyx +++ b/Azaion.Inference/inference.pyx @@ -137,7 +137,7 @@ cdef class Inference: self._previous_annotation = None v_input = cv2.VideoCapture(video_name) - while v_input.isOpened(): + while v_input.isOpened() and not self.stop_signal: ret, frame = v_input.read() if not ret or frame is None: break @@ -186,11 +186,9 @@ cdef class Inference: print(annotation.to_str(self.class_names)) self.on_annotation(cmd, annotation) - cdef stop(self): self.stop_signal = True - cdef bint is_valid_annotation(self, Annotation annotation): # No detections, invalid if not annotation.detections: diff --git a/Azaion.Inference/main.pyx b/Azaion.Inference/main.pyx index 5d306f0..64bf79d 100644 --- a/Azaion.Inference/main.pyx +++ b/Azaion.Inference/main.pyx @@ -1,6 +1,8 @@ +import queue import traceback from queue import Queue cimport constants +from threading import Thread from api_client cimport ApiClient from annotation cimport Annotation @@ -20,14 +22,14 @@ cdef class ParsedArguments: cdef class CommandProcessor: cdef ApiClient api_client cdef RemoteCommandHandler remote_handler - cdef object command_queue + cdef object inference_queue cdef bint running cdef Inference inference def __init__(self, args: ParsedArguments): self.api_client = ApiClient(args.email, args.password, args.folder) self.remote_handler = RemoteCommandHandler(self.on_command) - self.command_queue = Queue(maxsize=constants.QUEUE_MAXSIZE) + self.inference_queue = Queue(maxsize=constants.QUEUE_MAXSIZE) self.remote_handler.start() self.running = True model = self.api_client.load_ai_model() @@ -36,11 +38,14 @@ cdef class CommandProcessor: def start(self): while self.running: try: - command = self.command_queue.get() + command = self.inference_queue.get(timeout=0.5) self.inference.run_inference(command) self.remote_handler.send(command.client_id, 'DONE'.encode('utf-8')) + except queue.Empty: + continue except Exception as e: traceback.print_exc() + print('EXIT!') cdef on_command(self, RemoteCommand command): try: @@ -50,11 +55,12 @@ cdef class CommandProcessor: response = self.api_client.load_bytes(command.filename) self.remote_handler.send(command.client_id, response) elif command.command_type == CommandType.INFERENCE: - self.command_queue.put(command) + self.inference_queue.put(command) elif command.command_type == CommandType.STOP_INFERENCE: self.inference.stop() elif command.command_type == CommandType.EXIT: - self.stop() + t = Thread(target=self.stop) # non-block worker: + t.start() else: pass except Exception as e: @@ -68,5 +74,6 @@ cdef class CommandProcessor: self.remote_handler.send(cmd.client_id, data) def stop(self): + self.inference.stop() self.remote_handler.stop() self.running = False diff --git a/Azaion.Inference/remote_command_handler.pxd b/Azaion.Inference/remote_command_handler.pxd index ff53f1f..b4aaba9 100644 --- a/Azaion.Inference/remote_command_handler.pxd +++ b/Azaion.Inference/remote_command_handler.pxd @@ -2,6 +2,7 @@ cdef class RemoteCommandHandler: cdef object _context cdef object _router cdef object _dealer + cdef object _control cdef object _shutdown_event cdef object _on_command diff --git a/Azaion.Inference/remote_command_handler.pyx b/Azaion.Inference/remote_command_handler.pyx index 00a4ec1..383ece9 100644 --- a/Azaion.Inference/remote_command_handler.pyx +++ b/Azaion.Inference/remote_command_handler.pyx @@ -9,7 +9,6 @@ cdef class RemoteCommandHandler: def __init__(self, object on_command): self._on_command = on_command self._context = zmq.Context.instance() - self._shutdown_event = Event() self._router = self._context.socket(zmq.ROUTER) self._router.setsockopt(zmq.LINGER, 0) @@ -19,6 +18,10 @@ cdef class RemoteCommandHandler: self._dealer.setsockopt(zmq.LINGER, 0) self._dealer.bind("inproc://backend") + self._control = self._context.socket(zmq.PAIR) + self._control.bind("inproc://control") + self._shutdown_event = Event() + self._proxy_thread = Thread(target=self._proxy_loop, daemon=True) self._workers = [] @@ -32,7 +35,13 @@ cdef class RemoteCommandHandler: worker.start() cdef _proxy_loop(self): - zmq.proxy(self._router, self._dealer) + try: + zmq.proxy_steerable(self._router, self._dealer, control=self._control) + except zmq.error.ZMQError as e: + if self._shutdown_event.is_set(): + print("Shutdown, exit proxy loop.") + else: + raise cdef _worker_loop(self): worker_socket = self._context.socket(zmq.DEALER) @@ -40,20 +49,24 @@ cdef class RemoteCommandHandler: worker_socket.connect("inproc://backend") poller = zmq.Poller() poller.register(worker_socket, zmq.POLLIN) - print('started receiver loop...') - while not self._shutdown_event.is_set(): - try: - socks = dict(poller.poll(500)) - if worker_socket in socks: - client_id, message = worker_socket.recv_multipart() - cmd = RemoteCommand.from_msgpack( message) - cmd.client_id = client_id - print(f'Received [{cmd}] from the client {client_id}') - self._on_command(cmd) - except Exception as e: - print(f"Worker error: {e}") - import traceback - traceback.print_exc() + try: + + while not self._shutdown_event.is_set(): + try: + socks = dict(poller.poll(500)) + if worker_socket in socks: + client_id, message = worker_socket.recv_multipart() + cmd = RemoteCommand.from_msgpack( message) + cmd.client_id = client_id + print(f'Received [{cmd}] from the client {client_id}') + self._on_command(cmd) + except Exception as e: + if not self._shutdown_event.is_set(): + print(f"Worker error: {e}") + import traceback + traceback.print_exc() + finally: + worker_socket.close() cdef send(self, bytes client_id, bytes data): with self._context.socket(zmq.DEALER) as socket: @@ -63,7 +76,16 @@ cdef class RemoteCommandHandler: cdef stop(self): self._shutdown_event.set() - time.sleep(0.5) - self._router.close() - self._dealer.close() - self._context.term() \ No newline at end of file + try: + self._control.send(b"TERMINATE", flags=zmq.DONTWAIT) + except zmq.error.ZMQError: + pass + self._router.close(linger=0) + self._dealer.close(linger=0) + self._control.close(linger=0) + + self._proxy_thread.join(timeout=2) + while any(w.is_alive() for w in self._workers): + time.sleep(0.1) + + self._context.term() diff --git a/Azaion.Inference/token b/Azaion.Inference/token index 6e6d30b..e441bc2 100644 --- a/Azaion.Inference/token +++ b/Azaion.Inference/token @@ -1 +1 @@ -eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiI5N2U5MWI2OC1hNmRlLTQ3YTgtOTgzYi0xOTU3YzViNDQ2MTkiLCJ1bmlxdWVfbmFtZSI6ImFkbWluLXJlbW90ZUBhemFpb24uY29tIiwicm9sZSI6IkFwaUFkbWluIiwibmJmIjoxNzM5MTk5MzM1LCJleHAiOjE3MzkyMTM3MzUsImlhdCI6MTczOTE5OTMzNSwiaXNzIjoiQXphaW9uQXBpIiwiYXVkIjoiQW5ub3RhdG9ycy9PcmFuZ2VQaS9BZG1pbnMifQ.HmfaeFn9T_eJuRZSjBV_EhiSB41ippBVPLRggC7gBZk \ No newline at end of file +eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiI5N2U5MWI2OC1hNmRlLTQ3YTgtOTgzYi0xOTU3YzViNDQ2MTkiLCJ1bmlxdWVfbmFtZSI6ImFkbWluLXJlbW90ZUBhemFpb24uY29tIiwicm9sZSI6IkFwaUFkbWluIiwibmJmIjoxNzM5Mjk0ODY2LCJleHAiOjE3MzkzMDkyNjYsImlhdCI6MTczOTI5NDg2NiwiaXNzIjoiQXphaW9uQXBpIiwiYXVkIjoiQW5ub3RhdG9ycy9PcmFuZ2VQaS9BZG1pbnMifQ.fp0YzE42mqtG2fd4BtaX2ZH-0-9YLXHPDqoAHSpfWjk \ No newline at end of file diff --git a/Azaion.Suite/App.xaml.cs b/Azaion.Suite/App.xaml.cs index 8b122f4..84c0105 100644 --- a/Azaion.Suite/App.xaml.cs +++ b/Azaion.Suite/App.xaml.cs @@ -3,7 +3,6 @@ using System.Reflection; using System.Windows; using System.Windows.Threading; using Azaion.Annotator; -using Azaion.Annotator.Extensions; using Azaion.Common.Database; using Azaion.Common.DTO; using Azaion.Common.DTO.Config;