make cython app exit correctly

This commit is contained in:
Alex Bezdieniezhnykh
2025-02-11 20:40:49 +02:00
parent 9973a16ada
commit 43cae0d03c
9 changed files with 87 additions and 65 deletions
+21 -19
View File
@@ -14,6 +14,7 @@ using Azaion.Common.DTO.Config;
using Azaion.Common.Events; using Azaion.Common.Events;
using Azaion.Common.Extensions; using Azaion.Common.Extensions;
using Azaion.Common.Services; using Azaion.Common.Services;
using Azaion.CommonSecurity.Services;
using LibVLCSharp.Shared; using LibVLCSharp.Shared;
using MediatR; using MediatR;
using Microsoft.WindowsAPICodePack.Dialogs; using Microsoft.WindowsAPICodePack.Dialogs;
@@ -40,6 +41,7 @@ public partial class Annotator
private readonly AnnotationService _annotationService; private readonly AnnotationService _annotationService;
private readonly IDbFactory _dbFactory; private readonly IDbFactory _dbFactory;
private readonly IInferenceService _inferenceService; private readonly IInferenceService _inferenceService;
private readonly IResourceLoader _resourceLoader;
private ObservableCollection<DetectionClass> AnnotationClasses { get; set; } = new(); private ObservableCollection<DetectionClass> AnnotationClasses { get; set; } = new();
private bool _suspendLayout; private bool _suspendLayout;
@@ -69,7 +71,8 @@ public partial class Annotator
ILogger<Annotator> logger, ILogger<Annotator> logger,
AnnotationService annotationService, AnnotationService annotationService,
IDbFactory dbFactory, IDbFactory dbFactory,
IInferenceService inferenceService) IInferenceService inferenceService,
IResourceLoader resourceLoader)
{ {
InitializeComponent(); InitializeComponent();
_appConfig = appConfig.Value; _appConfig = appConfig.Value;
@@ -83,6 +86,7 @@ public partial class Annotator
_annotationService = annotationService; _annotationService = annotationService;
_dbFactory = dbFactory; _dbFactory = dbFactory;
_inferenceService = inferenceService; _inferenceService = inferenceService;
_resourceLoader = resourceLoader;
Loaded += OnLoaded; Loaded += OnLoaded;
Closed += OnFormClosed; Closed += OnFormClosed;
@@ -397,6 +401,7 @@ public partial class Annotator
private void OnFormClosed(object? sender, EventArgs e) private void OnFormClosed(object? sender, EventArgs e)
{ {
_resourceLoader.StopPython();
MainCancellationSource.Cancel(); MainCancellationSource.Cancel();
DetectionCancellationSource.Cancel(); DetectionCancellationSource.Cancel();
_mediaPlayer.Stop(); _mediaPlayer.Stop();
@@ -494,20 +499,6 @@ public partial class Annotator
private (TimeSpan Time, List<Detection> Detections)? _previousDetection; private (TimeSpan Time, List<Detection> Detections)? _previousDetection;
private List<string> GetLvFiles()
{
return Dispatcher.Invoke(() =>
{
var source = LvFiles.ItemsSource as IEnumerable<MediaFileInfo>;
var items = source?.Skip(LvFiles.SelectedIndex)
.Take(Constants.DETECTION_BATCH_SIZE)
.Select(x => x.Path)
.ToList();
return items ?? new List<string>();
});
}
public void AutoDetect(object sender, RoutedEventArgs e) public void AutoDetect(object sender, RoutedEventArgs e)
{ {
if (IsInferenceNow) if (IsInferenceNow)
@@ -529,18 +520,29 @@ public partial class Annotator
var ct = DetectionCancellationSource.Token; var ct = DetectionCancellationSource.Token;
_ = Task.Run(async () => _ = Task.Run(async () =>
{ {
var files = GetLvFiles(); while (!ct.IsCancellationRequested)
while (files.Any() && !ct.IsCancellationRequested)
{ {
var files = new List<string>();
await Dispatcher.Invoke(async () => await Dispatcher.Invoke(async () =>
{ {
files = (LvFiles.ItemsSource as IEnumerable<MediaFileInfo>)?.Skip(LvFiles.SelectedIndex)
.Take(Constants.DETECTION_BATCH_SIZE)
.Select(x => x.Path)
.ToList();
await _mediator.Publish(new AnnotatorControlEvent(PlaybackControlEnum.Play), ct); await _mediator.Publish(new AnnotatorControlEvent(PlaybackControlEnum.Play), ct);
await ReloadAnnotations(); await ReloadAnnotations();
}); });
await _inferenceService.RunInference(files, async annotationImage => await ProcessDetection(annotationImage), ct); await _inferenceService.RunInference(files, async annotationImage => await ProcessDetection(annotationImage), ct);
files = GetLvFiles();
Dispatcher.Invoke(() => LvFiles.Items.Refresh()); Dispatcher.Invoke(() =>
{
if (LvFiles.SelectedIndex + files.Count >= LvFiles.Items.Count)
DetectionCancellationSource.Cancel();
LvFiles.SelectedIndex += files.Count;
LvFiles.Items.Refresh();
});
} }
Dispatcher.Invoke(() => Dispatcher.Invoke(() =>
{ {
+1 -1
View File
@@ -24,5 +24,5 @@ public class SecurityConstants
#endregion SocketClient #endregion SocketClient
public static string AzaionInferencePath = "azaion-inference.exe"; public const string AZAION_INFERENCE_PATH = "azaion-inference.exe";
} }
@@ -11,6 +11,7 @@ namespace Azaion.CommonSecurity.Services;
public interface IResourceLoader public interface IResourceLoader
{ {
MemoryStream LoadFileFromPython(string fileName); MemoryStream LoadFileFromPython(string fileName);
void StopPython();
} }
public interface IAuthProvider public interface IAuthProvider
@@ -49,11 +50,11 @@ public class PythonResourceLoader : IResourceLoader, IAuthProvider
using var process = new Process(); using var process = new Process();
process.StartInfo = new ProcessStartInfo process.StartInfo = new ProcessStartInfo
{ {
FileName = SecurityConstants.AzaionInferencePath, FileName = SecurityConstants.AZAION_INFERENCE_PATH,
Arguments = $"-e {credentials.Email} -p {credentials.Password} -f {apiConfig.ResourcesFolder}", Arguments = $"-e {credentials.Email} -p {credentials.Password} -f {apiConfig.ResourcesFolder}",
UseShellExecute = false, //UseShellExecute = false,
RedirectStandardOutput = true, //RedirectStandardOutput = true,
RedirectStandardError = true, // RedirectStandardError = true,
//CreateNoWindow = true //CreateNoWindow = true
}; };
@@ -62,18 +63,10 @@ public class PythonResourceLoader : IResourceLoader, IAuthProvider
process.Start(); process.Start();
} }
public async Task LoadFileFromApi(CancellationToken cancellationToken = default) public void StopPython()
{ {
var hardwareService = new HardwareService(); _dealer.SendFrame(MessagePackSerializer.Serialize(new RemoteCommand(CommandType.Exit)));
var hardwareInfo = hardwareService.GetHardware(); _dealer?.Close();
var encryptedStream = await _api.GetResource("azaion_inference.exe", _credentials.Password, hardwareInfo, cancellationToken);
var key = Security.MakeEncryptionKey(_credentials.Email, _credentials.Password, hardwareInfo.Hash);
var stream = new MemoryStream();
await encryptedStream.DecryptTo(stream, key, cancellationToken);
stream.Seek(0, SeekOrigin.Begin);
} }
public MemoryStream LoadFileFromPython(string fileName) public MemoryStream LoadFileFromPython(string fileName)
+1 -3
View File
@@ -137,7 +137,7 @@ cdef class Inference:
self._previous_annotation = None self._previous_annotation = None
v_input = cv2.VideoCapture(<str>video_name) v_input = cv2.VideoCapture(<str>video_name)
while v_input.isOpened(): while v_input.isOpened() and not self.stop_signal:
ret, frame = v_input.read() ret, frame = v_input.read()
if not ret or frame is None: if not ret or frame is None:
break break
@@ -186,11 +186,9 @@ cdef class Inference:
print(annotation.to_str(self.class_names)) print(annotation.to_str(self.class_names))
self.on_annotation(cmd, annotation) self.on_annotation(cmd, annotation)
cdef stop(self): cdef stop(self):
self.stop_signal = True self.stop_signal = True
cdef bint is_valid_annotation(self, Annotation annotation): cdef bint is_valid_annotation(self, Annotation annotation):
# No detections, invalid # No detections, invalid
if not annotation.detections: if not annotation.detections:
+12 -5
View File
@@ -1,6 +1,8 @@
import queue
import traceback import traceback
from queue import Queue from queue import Queue
cimport constants cimport constants
from threading import Thread
from api_client cimport ApiClient from api_client cimport ApiClient
from annotation cimport Annotation from annotation cimport Annotation
@@ -20,14 +22,14 @@ cdef class ParsedArguments:
cdef class CommandProcessor: cdef class CommandProcessor:
cdef ApiClient api_client cdef ApiClient api_client
cdef RemoteCommandHandler remote_handler cdef RemoteCommandHandler remote_handler
cdef object command_queue cdef object inference_queue
cdef bint running cdef bint running
cdef Inference inference cdef Inference inference
def __init__(self, args: ParsedArguments): def __init__(self, args: ParsedArguments):
self.api_client = ApiClient(args.email, args.password, args.folder) self.api_client = ApiClient(args.email, args.password, args.folder)
self.remote_handler = RemoteCommandHandler(self.on_command) self.remote_handler = RemoteCommandHandler(self.on_command)
self.command_queue = Queue(maxsize=constants.QUEUE_MAXSIZE) self.inference_queue = Queue(maxsize=constants.QUEUE_MAXSIZE)
self.remote_handler.start() self.remote_handler.start()
self.running = True self.running = True
model = self.api_client.load_ai_model() model = self.api_client.load_ai_model()
@@ -36,11 +38,14 @@ cdef class CommandProcessor:
def start(self): def start(self):
while self.running: while self.running:
try: try:
command = self.command_queue.get() command = self.inference_queue.get(timeout=0.5)
self.inference.run_inference(command) self.inference.run_inference(command)
self.remote_handler.send(command.client_id, <bytes>'DONE'.encode('utf-8')) self.remote_handler.send(command.client_id, <bytes>'DONE'.encode('utf-8'))
except queue.Empty:
continue
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
print('EXIT!')
cdef on_command(self, RemoteCommand command): cdef on_command(self, RemoteCommand command):
try: try:
@@ -50,11 +55,12 @@ cdef class CommandProcessor:
response = self.api_client.load_bytes(command.filename) response = self.api_client.load_bytes(command.filename)
self.remote_handler.send(command.client_id, response) self.remote_handler.send(command.client_id, response)
elif command.command_type == CommandType.INFERENCE: elif command.command_type == CommandType.INFERENCE:
self.command_queue.put(command) self.inference_queue.put(command)
elif command.command_type == CommandType.STOP_INFERENCE: elif command.command_type == CommandType.STOP_INFERENCE:
self.inference.stop() self.inference.stop()
elif command.command_type == CommandType.EXIT: elif command.command_type == CommandType.EXIT:
self.stop() t = Thread(target=self.stop) # non-block worker:
t.start()
else: else:
pass pass
except Exception as e: except Exception as e:
@@ -68,5 +74,6 @@ cdef class CommandProcessor:
self.remote_handler.send(cmd.client_id, data) self.remote_handler.send(cmd.client_id, data)
def stop(self): def stop(self):
self.inference.stop()
self.remote_handler.stop() self.remote_handler.stop()
self.running = False self.running = False
@@ -2,6 +2,7 @@ cdef class RemoteCommandHandler:
cdef object _context cdef object _context
cdef object _router cdef object _router
cdef object _dealer cdef object _dealer
cdef object _control
cdef object _shutdown_event cdef object _shutdown_event
cdef object _on_command cdef object _on_command
+42 -20
View File
@@ -9,7 +9,6 @@ cdef class RemoteCommandHandler:
def __init__(self, object on_command): def __init__(self, object on_command):
self._on_command = on_command self._on_command = on_command
self._context = zmq.Context.instance() self._context = zmq.Context.instance()
self._shutdown_event = Event()
self._router = self._context.socket(zmq.ROUTER) self._router = self._context.socket(zmq.ROUTER)
self._router.setsockopt(zmq.LINGER, 0) self._router.setsockopt(zmq.LINGER, 0)
@@ -19,6 +18,10 @@ cdef class RemoteCommandHandler:
self._dealer.setsockopt(zmq.LINGER, 0) self._dealer.setsockopt(zmq.LINGER, 0)
self._dealer.bind("inproc://backend") self._dealer.bind("inproc://backend")
self._control = self._context.socket(zmq.PAIR)
self._control.bind("inproc://control")
self._shutdown_event = Event()
self._proxy_thread = Thread(target=self._proxy_loop, daemon=True) self._proxy_thread = Thread(target=self._proxy_loop, daemon=True)
self._workers = [] self._workers = []
@@ -32,7 +35,13 @@ cdef class RemoteCommandHandler:
worker.start() worker.start()
cdef _proxy_loop(self): cdef _proxy_loop(self):
zmq.proxy(self._router, self._dealer) try:
zmq.proxy_steerable(self._router, self._dealer, control=self._control)
except zmq.error.ZMQError as e:
if self._shutdown_event.is_set():
print("Shutdown, exit proxy loop.")
else:
raise
cdef _worker_loop(self): cdef _worker_loop(self):
worker_socket = self._context.socket(zmq.DEALER) worker_socket = self._context.socket(zmq.DEALER)
@@ -40,20 +49,24 @@ cdef class RemoteCommandHandler:
worker_socket.connect("inproc://backend") worker_socket.connect("inproc://backend")
poller = zmq.Poller() poller = zmq.Poller()
poller.register(worker_socket, zmq.POLLIN) poller.register(worker_socket, zmq.POLLIN)
print('started receiver loop...') try:
while not self._shutdown_event.is_set():
try: while not self._shutdown_event.is_set():
socks = dict(poller.poll(500)) try:
if worker_socket in socks: socks = dict(poller.poll(500))
client_id, message = worker_socket.recv_multipart() if worker_socket in socks:
cmd = RemoteCommand.from_msgpack(<bytes> message) client_id, message = worker_socket.recv_multipart()
cmd.client_id = client_id cmd = RemoteCommand.from_msgpack(<bytes> message)
print(f'Received [{cmd}] from the client {client_id}') cmd.client_id = client_id
self._on_command(cmd) print(f'Received [{cmd}] from the client {client_id}')
except Exception as e: self._on_command(cmd)
print(f"Worker error: {e}") except Exception as e:
import traceback if not self._shutdown_event.is_set():
traceback.print_exc() print(f"Worker error: {e}")
import traceback
traceback.print_exc()
finally:
worker_socket.close()
cdef send(self, bytes client_id, bytes data): cdef send(self, bytes client_id, bytes data):
with self._context.socket(zmq.DEALER) as socket: with self._context.socket(zmq.DEALER) as socket:
@@ -63,7 +76,16 @@ cdef class RemoteCommandHandler:
cdef stop(self): cdef stop(self):
self._shutdown_event.set() self._shutdown_event.set()
time.sleep(0.5) try:
self._router.close() self._control.send(b"TERMINATE", flags=zmq.DONTWAIT)
self._dealer.close() except zmq.error.ZMQError:
self._context.term() pass
self._router.close(linger=0)
self._dealer.close(linger=0)
self._control.close(linger=0)
self._proxy_thread.join(timeout=2)
while any(w.is_alive() for w in self._workers):
time.sleep(0.1)
self._context.term()
+1 -1
View File
@@ -1 +1 @@
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiI5N2U5MWI2OC1hNmRlLTQ3YTgtOTgzYi0xOTU3YzViNDQ2MTkiLCJ1bmlxdWVfbmFtZSI6ImFkbWluLXJlbW90ZUBhemFpb24uY29tIiwicm9sZSI6IkFwaUFkbWluIiwibmJmIjoxNzM5MTk5MzM1LCJleHAiOjE3MzkyMTM3MzUsImlhdCI6MTczOTE5OTMzNSwiaXNzIjoiQXphaW9uQXBpIiwiYXVkIjoiQW5ub3RhdG9ycy9PcmFuZ2VQaS9BZG1pbnMifQ.HmfaeFn9T_eJuRZSjBV_EhiSB41ippBVPLRggC7gBZk eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiI5N2U5MWI2OC1hNmRlLTQ3YTgtOTgzYi0xOTU3YzViNDQ2MTkiLCJ1bmlxdWVfbmFtZSI6ImFkbWluLXJlbW90ZUBhemFpb24uY29tIiwicm9sZSI6IkFwaUFkbWluIiwibmJmIjoxNzM5Mjk0ODY2LCJleHAiOjE3MzkzMDkyNjYsImlhdCI6MTczOTI5NDg2NiwiaXNzIjoiQXphaW9uQXBpIiwiYXVkIjoiQW5ub3RhdG9ycy9PcmFuZ2VQaS9BZG1pbnMifQ.fp0YzE42mqtG2fd4BtaX2ZH-0-9YLXHPDqoAHSpfWjk
-1
View File
@@ -3,7 +3,6 @@ using System.Reflection;
using System.Windows; using System.Windows;
using System.Windows.Threading; using System.Windows.Threading;
using Azaion.Annotator; using Azaion.Annotator;
using Azaion.Annotator.Extensions;
using Azaion.Common.Database; using Azaion.Common.Database;
using Azaion.Common.DTO; using Azaion.Common.DTO;
using Azaion.Common.DTO.Config; using Azaion.Common.DTO.Config;