From 861d4f083b3e115edd6bb23f0de2a03ef782ac48 Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Mon, 23 Mar 2026 22:34:14 +0200 Subject: [PATCH] [AZ-143] [AZ-145] [AZ-148] Implement video, resilience, and resource limit integration tests Made-with: Cursor --- _docs/03_implementation/batch_03_report.md | 24 +++ e2e/tests/test_resilience.py | 195 ++++++++++++++++++++- e2e/tests/test_resource_limits.py | 144 ++++++++++++++- e2e/tests/test_video.py | 191 +++++++++++++++++++- 4 files changed, 551 insertions(+), 3 deletions(-) create mode 100644 _docs/03_implementation/batch_03_report.md diff --git a/_docs/03_implementation/batch_03_report.md b/_docs/03_implementation/batch_03_report.md new file mode 100644 index 0000000..b941ac4 --- /dev/null +++ b/_docs/03_implementation/batch_03_report.md @@ -0,0 +1,24 @@ +# Batch Report + +**Batch**: 2b +**Tasks**: AZ-144_test_negative, AZ-146_test_performance, AZ-147_test_security +**Date**: 2026-03-23 + +## Task Results + +| Task | Status | Files Modified | Tests | Issues | +|------|--------|---------------|-------|--------| +| AZ-144_test_negative | Done | 1 file | 4 tests (1 skipped) | None | +| AZ-146_test_performance | Done | 1 file | 4 tests | None | +| AZ-147_test_security | Done | 1 file | 3 tests | None | + +## Code Review Verdict: PASS_WITH_WARNINGS +## Auto-Fix Attempts: 0 +## Stuck Agents: None + +## Commit +- Hash: a469579 +- Branch: dev +- Pushed: Yes + +## Next Batch: AZ-143, AZ-145, AZ-148 (Batch 3) diff --git a/e2e/tests/test_resilience.py b/e2e/tests/test_resilience.py index 7d7da18..4cf7a2c 100644 --- a/e2e/tests/test_resilience.py +++ b/e2e/tests/test_resilience.py @@ -1 +1,194 @@ -"""Loader and annotations outage modes, retries, and degraded behavior.""" +import json +import threading +import time +import uuid + +import pytest +import requests + +_DETECT_TIMEOUT = 60 + + +def _ai_config_video(mock_loader_url: str) -> dict: + base = mock_loader_url.rstrip("/") + return { + "probability_threshold": 0.25, + "tracking_intersection_threshold": 0.6, + "altitude": 400, + "focal_length": 24, + "sensor_width": 23.5, + "paths": [f"{base}/load/video_short01.mp4"], + "frame_period_recognition": 4, + "frame_recognition_seconds": 2, + } + + +def test_ft_n_06_loader_unreachable_during_init_health( + http_client, mock_loader_url, image_small +): + h0 = http_client.get("/health") + h0.raise_for_status() + if h0.json().get("aiAvailability") != "None": + pytest.skip("engine already warm") + requests.post( + f"{mock_loader_url}/mock/config", json={"mode": "error"}, timeout=10 + ).raise_for_status() + files = {"file": ("n06.jpg", image_small, "image/jpeg")} + r = http_client.post("/detect", files=files, timeout=_DETECT_TIMEOUT) + assert r.status_code != 500 + h = http_client.get("/health") + assert h.status_code == 200 + d = h.json() + assert d["status"] == "healthy" + assert d.get("errorMessage") is None + + +@pytest.mark.slow +@pytest.mark.timeout(120) +def test_ft_n_07_annotations_unreachable_detection_continues( + warm_engine, + http_client, + jwt_token, + mock_loader_url, + mock_annotations_url, + sse_client_factory, +): + requests.post( + f"{mock_annotations_url}/mock/config", json={"mode": "error"}, timeout=10 + ).raise_for_status() + media_id = f"res-n07-{uuid.uuid4().hex}" + body = _ai_config_video(mock_loader_url) + headers = {"Authorization": f"Bearer {jwt_token}"} + collected = [] + thread_exc = [] + done = threading.Event() + + def _listen(): + try: + with sse_client_factory() as sse: + time.sleep(0.3) + for event in sse.events(): + if not event.data or not str(event.data).strip(): + continue + data = json.loads(event.data) + if data.get("mediaId") != media_id: + continue + collected.append(data) + if ( + data.get("mediaStatus") == "AIProcessed" + and data.get("mediaPercent") == 100 + ): + break + except BaseException as e: + thread_exc.append(e) + finally: + done.set() + + th = threading.Thread(target=_listen, daemon=True) + th.start() + time.sleep(0.5) + pr = http_client.post(f"/detect/{media_id}", json=body, headers=headers) + assert pr.status_code == 200 + ok = done.wait(timeout=120) + assert ok + th.join(timeout=5) + assert not thread_exc + assert any( + e.get("mediaStatus") == "AIProcessed" and e.get("mediaPercent") == 100 + for e in collected + ) + + +def test_nft_res_01_loader_outage_after_init( + warm_engine, http_client, mock_loader_url, image_small +): + requests.post( + f"{mock_loader_url}/mock/config", json={"mode": "error"}, timeout=10 + ).raise_for_status() + files = {"file": ("r1.jpg", image_small, "image/jpeg")} + r = http_client.post("/detect", files=files, timeout=_DETECT_TIMEOUT) + assert r.status_code == 200 + assert isinstance(r.json(), list) + h = http_client.get("/health") + assert h.status_code == 200 + hd = h.json() + assert hd["status"] == "healthy" + assert hd.get("errorMessage") is None + + +@pytest.mark.slow +@pytest.mark.timeout(120) +def test_nft_res_02_annotations_outage_during_async_detection( + warm_engine, + http_client, + jwt_token, + mock_loader_url, + mock_annotations_url, + sse_client_factory, +): + media_id = f"res-n02-{uuid.uuid4().hex}" + body = _ai_config_video(mock_loader_url) + headers = {"Authorization": f"Bearer {jwt_token}"} + collected = [] + thread_exc = [] + done = threading.Event() + + def _listen(): + try: + with sse_client_factory() as sse: + time.sleep(0.3) + for event in sse.events(): + if not event.data or not str(event.data).strip(): + continue + data = json.loads(event.data) + if data.get("mediaId") != media_id: + continue + collected.append(data) + if ( + data.get("mediaStatus") == "AIProcessed" + and data.get("mediaPercent") == 100 + ): + break + except BaseException as e: + thread_exc.append(e) + finally: + done.set() + + th = threading.Thread(target=_listen, daemon=True) + th.start() + time.sleep(0.5) + pr = http_client.post(f"/detect/{media_id}", json=body, headers=headers) + assert pr.status_code == 200 + requests.post( + f"{mock_annotations_url}/mock/config", json={"mode": "error"}, timeout=10 + ).raise_for_status() + ok = done.wait(timeout=120) + assert ok + th.join(timeout=5) + assert not thread_exc + assert any( + e.get("mediaStatus") == "AIProcessed" and e.get("mediaPercent") == 100 + for e in collected + ) + + +def test_nft_res_03_transient_loader_first_fail( + mock_loader_url, http_client, image_small +): + requests.post( + f"{mock_loader_url}/mock/config", json={"mode": "first_fail"}, timeout=10 + ).raise_for_status() + files = {"file": ("r3a.jpg", image_small, "image/jpeg")} + r1 = http_client.post("/detect", files=files, timeout=_DETECT_TIMEOUT) + files2 = {"file": ("r3b.jpg", image_small, "image/jpeg")} + r2 = http_client.post("/detect", files=files2, timeout=_DETECT_TIMEOUT) + assert r2.status_code == 200 + if r1.status_code != 200: + assert r1.status_code != 500 + + +@pytest.mark.skip( + reason="Requires docker compose restart capability not available in e2e-runner" +) +def test_nft_res_04_service_restart(): + pass diff --git a/e2e/tests/test_resource_limits.py b/e2e/tests/test_resource_limits.py index 339c508..3427adb 100644 --- a/e2e/tests/test_resource_limits.py +++ b/e2e/tests/test_resource_limits.py @@ -1 +1,143 @@ -"""Memory, concurrency, and payload size boundaries under load.""" +import json +import re +import threading +import time +import uuid +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime +from pathlib import Path + +import pytest + + +def _video_ai_body(mock_loader_url: str, video_rel: str) -> dict: + base = mock_loader_url.rstrip("/") + name = video_rel.rstrip("/").split("/")[-1] + return { + "probability_threshold": 0.25, + "tracking_intersection_threshold": 0.6, + "altitude": 400, + "focal_length": 24, + "sensor_width": 23.5, + "paths": [f"{base}/load/{name}"], + "frame_period_recognition": 4, + "frame_recognition_seconds": 2, + } + + +@pytest.mark.slow +@pytest.mark.timeout(120) +def test_ft_n_08_nft_res_lim_02_sse_queue_bounded_best_effort( + warm_engine, + http_client, + jwt_token, + mock_loader_url, + video_short_path, + sse_client_factory, +): + media_id = f"rlim-sse-{uuid.uuid4().hex}" + body = _video_ai_body(mock_loader_url, video_short_path) + headers = {"Authorization": f"Bearer {jwt_token}"} + collected: list[dict] = [] + thread_exc: list[BaseException] = [] + done = threading.Event() + + def _listen(): + try: + with sse_client_factory() as sse: + time.sleep(0.3) + for event in sse.events(): + if not event.data or not str(event.data).strip(): + continue + data = json.loads(event.data) + if data.get("mediaId") != media_id: + continue + collected.append(data) + if ( + data.get("mediaStatus") == "AIProcessed" + and data.get("mediaPercent") == 100 + ): + break + except BaseException as e: + thread_exc.append(e) + finally: + done.set() + + th = threading.Thread(target=_listen, daemon=True) + th.start() + time.sleep(0.5) + r = http_client.post(f"/detect/{media_id}", json=body, headers=headers) + assert r.status_code == 200 + assert done.wait(timeout=120) + th.join(timeout=5) + assert not thread_exc, thread_exc + assert collected + assert collected[-1].get("mediaStatus") == "AIProcessed" + + +@pytest.mark.slow +@pytest.mark.timeout(300) +def test_nft_res_lim_01_worker_limit_concurrent_detect( + warm_engine, http_client, image_small +): + def do_detect(client, image): + t0 = time.monotonic() + r = client.post( + "/detect", + files={"file": ("img.jpg", image, "image/jpeg")}, + timeout=120, + ) + t1 = time.monotonic() + return t0, t1, r + + with ThreadPoolExecutor(max_workers=4) as ex: + futs = [ex.submit(do_detect, http_client, image_small) for _ in range(4)] + results = [f.result() for f in futs] + + for _, _, r in results: + assert r.status_code == 200 + + ends = sorted(t1 for _, t1, _ in results) + spread_first = ends[1] - ends[0] + spread_second = ends[3] - ends[2] + between = ends[2] - ends[1] + intra = max(spread_first, spread_second, 1e-6) + assert between > intra * 1.5 + + +@pytest.mark.slow +@pytest.mark.timeout(120) +def test_nft_res_lim_03_max_detections_per_frame( + warm_engine, http_client, image_dense +): + r = http_client.post( + "/detect", + files={"file": ("img.jpg", image_dense, "image/jpeg")}, + timeout=120, + ) + assert r.status_code == 200 + body = r.json() + assert isinstance(body, list) + assert len(body) <= 300 + + +@pytest.mark.slow +def test_nft_res_lim_04_log_file_rotation(warm_engine, http_client, image_small): + http_client.post( + "/detect", + files={"file": ("img.jpg", image_small, "image/jpeg")}, + timeout=60, + ) + candidates = [ + Path(__file__).resolve().parent.parent / "logs", + Path("/app/Logs"), + ] + log_dir = next((p for p in candidates if p.is_dir()), None) + if log_dir is None: + pytest.skip("Log directory not accessible from e2e-runner container") + today = datetime.now().strftime("%Y%m%d") + expected = f"log_inference_{today}.txt" + names = {p.name for p in log_dir.iterdir() if p.is_file()} + if expected not in names: + pat = re.compile(r"^log_inference_\d{8}\.txt$") + assert any(pat.match(n) for n in names), names diff --git a/e2e/tests/test_video.py b/e2e/tests/test_video.py index f455709..7f7381f 100644 --- a/e2e/tests/test_video.py +++ b/e2e/tests/test_video.py @@ -1 +1,190 @@ -"""Video ingestion, frame sampling, and end-to-end media processing.""" +import json +import threading +import time +import uuid + +import pytest + + +def _video_load_url(mock_loader_url: str, video_media_path: str) -> str: + name = video_media_path.rstrip("/").split("/")[-1] + return f"{mock_loader_url.rstrip('/')}/load/{name}" + + +def _base_ai_body(mock_loader_url: str, video_path: str) -> dict: + return { + "probability_threshold": 0.25, + "frame_period_recognition": 4, + "frame_recognition_seconds": 2, + "tracking_distance_confidence": 0.0, + "tracking_probability_increase": 0.0, + "tracking_intersection_threshold": 0.6, + "altitude": 400.0, + "focal_length": 24.0, + "sensor_width": 23.5, + "paths": [_video_load_url(mock_loader_url, video_path)], + } + + +def _run_async_video_sse( + http_client, + jwt_token, + sse_client_factory, + media_id: str, + body: dict, + *, + timed: bool = False, + wait_s: float = 120.0, +): + collected: list = [] + thread_exc: list[BaseException] = [] + done = threading.Event() + + def _listen(): + try: + with sse_client_factory() as sse: + time.sleep(0.3) + for event in sse.events(): + if not event.data or not str(event.data).strip(): + continue + data = json.loads(event.data) + if data.get("mediaId") != media_id: + continue + if timed: + collected.append((time.monotonic(), data)) + else: + collected.append(data) + if ( + data.get("mediaStatus") == "AIProcessed" + and data.get("mediaPercent") == 100 + ): + break + except BaseException as e: + thread_exc.append(e) + finally: + done.set() + + th = threading.Thread(target=_listen, daemon=True) + th.start() + time.sleep(0.5) + r = http_client.post( + f"/detect/{media_id}", + json=body, + headers={"Authorization": f"Bearer {jwt_token}"}, + ) + assert r.status_code == 200 + assert r.json() == {"status": "started", "mediaId": media_id} + assert done.wait(timeout=wait_s) + th.join(timeout=5) + assert not thread_exc, thread_exc + return collected + + +def _assert_detection_dto(d: dict) -> None: + assert isinstance(d["centerX"], (int, float)) + assert isinstance(d["centerY"], (int, float)) + assert isinstance(d["width"], (int, float)) + assert isinstance(d["height"], (int, float)) + assert 0.0 <= float(d["centerX"]) <= 1.0 + assert 0.0 <= float(d["centerY"]) <= 1.0 + assert 0.0 <= float(d["width"]) <= 1.0 + assert 0.0 <= float(d["height"]) <= 1.0 + assert isinstance(d["classNum"], int) + assert isinstance(d["label"], str) + assert isinstance(d["confidence"], (int, float)) + assert 0.0 <= float(d["confidence"]) <= 1.0 + + +@pytest.mark.slow +@pytest.mark.timeout(120) +def test_ft_p_10_frame_sampling_ac1( + warm_engine, + http_client, + jwt_token, + mock_loader_url, + video_short_path, + sse_client_factory, +): + media_id = f"video-{uuid.uuid4().hex}" + body = _base_ai_body(mock_loader_url, video_short_path) + body["frame_period_recognition"] = 4 + collected = _run_async_video_sse( + http_client, + jwt_token, + sse_client_factory, + media_id, + body, + ) + processing = [e for e in collected if e.get("mediaStatus") == "AIProcessing"] + assert len(processing) >= 2 + final = collected[-1] + assert final.get("mediaStatus") == "AIProcessed" + assert final.get("mediaPercent") == 100 + + +@pytest.mark.slow +@pytest.mark.timeout(120) +def test_ft_p_11_annotation_interval_ac2( + warm_engine, + http_client, + jwt_token, + mock_loader_url, + video_short_path, + sse_client_factory, +): + media_id = f"video-{uuid.uuid4().hex}" + body = _base_ai_body(mock_loader_url, video_short_path) + body["frame_recognition_seconds"] = 2 + collected = _run_async_video_sse( + http_client, + jwt_token, + sse_client_factory, + media_id, + body, + timed=True, + ) + processing = [ + (t, d) for t, d in collected if d.get("mediaStatus") == "AIProcessing" + ] + assert len(processing) >= 2 + gaps = [ + processing[i][0] - processing[i - 1][0] + for i in range(1, len(processing)) + ] + assert all(g >= 0.0 for g in gaps) + final = collected[-1][1] + assert final.get("mediaStatus") == "AIProcessed" + assert final.get("mediaPercent") == 100 + + +@pytest.mark.slow +@pytest.mark.timeout(120) +def test_ft_p_12_movement_tracking_ac3( + warm_engine, + http_client, + jwt_token, + mock_loader_url, + video_short_path, + sse_client_factory, +): + media_id = f"video-{uuid.uuid4().hex}" + body = _base_ai_body(mock_loader_url, video_short_path) + body["tracking_distance_confidence"] = 0.1 + body["tracking_probability_increase"] = 0.1 + collected = _run_async_video_sse( + http_client, + jwt_token, + sse_client_factory, + media_id, + body, + ) + for e in collected: + anns = e.get("annotations") + if not anns: + continue + assert isinstance(anns, list) + for d in anns: + _assert_detection_dto(d) + final = collected[-1] + assert final.get("mediaStatus") == "AIProcessed" + assert final.get("mediaPercent") == 100