[AZ-143] [AZ-145] [AZ-148] Implement video, resilience, and resource limit integration tests

Made-with: Cursor
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-03-23 22:34:14 +02:00
parent a469579882
commit 861d4f083b
4 changed files with 551 additions and 3 deletions
@@ -0,0 +1,24 @@
# Batch Report
**Batch**: 2b
**Tasks**: AZ-144_test_negative, AZ-146_test_performance, AZ-147_test_security
**Date**: 2026-03-23
## Task Results
| Task | Status | Files Modified | Tests | Issues |
|------|--------|---------------|-------|--------|
| AZ-144_test_negative | Done | 1 file | 4 tests (1 skipped) | None |
| AZ-146_test_performance | Done | 1 file | 4 tests | None |
| AZ-147_test_security | Done | 1 file | 3 tests | None |
## Code Review Verdict: PASS_WITH_WARNINGS
## Auto-Fix Attempts: 0
## Stuck Agents: None
## Commit
- Hash: a469579
- Branch: dev
- Pushed: Yes
## Next Batch: AZ-143, AZ-145, AZ-148 (Batch 3)
+194 -1
View File
@@ -1 +1,194 @@
"""Loader and annotations outage modes, retries, and degraded behavior."""
import json
import threading
import time
import uuid
import pytest
import requests
_DETECT_TIMEOUT = 60
def _ai_config_video(mock_loader_url: str) -> dict:
base = mock_loader_url.rstrip("/")
return {
"probability_threshold": 0.25,
"tracking_intersection_threshold": 0.6,
"altitude": 400,
"focal_length": 24,
"sensor_width": 23.5,
"paths": [f"{base}/load/video_short01.mp4"],
"frame_period_recognition": 4,
"frame_recognition_seconds": 2,
}
def test_ft_n_06_loader_unreachable_during_init_health(
http_client, mock_loader_url, image_small
):
h0 = http_client.get("/health")
h0.raise_for_status()
if h0.json().get("aiAvailability") != "None":
pytest.skip("engine already warm")
requests.post(
f"{mock_loader_url}/mock/config", json={"mode": "error"}, timeout=10
).raise_for_status()
files = {"file": ("n06.jpg", image_small, "image/jpeg")}
r = http_client.post("/detect", files=files, timeout=_DETECT_TIMEOUT)
assert r.status_code != 500
h = http_client.get("/health")
assert h.status_code == 200
d = h.json()
assert d["status"] == "healthy"
assert d.get("errorMessage") is None
@pytest.mark.slow
@pytest.mark.timeout(120)
def test_ft_n_07_annotations_unreachable_detection_continues(
warm_engine,
http_client,
jwt_token,
mock_loader_url,
mock_annotations_url,
sse_client_factory,
):
requests.post(
f"{mock_annotations_url}/mock/config", json={"mode": "error"}, timeout=10
).raise_for_status()
media_id = f"res-n07-{uuid.uuid4().hex}"
body = _ai_config_video(mock_loader_url)
headers = {"Authorization": f"Bearer {jwt_token}"}
collected = []
thread_exc = []
done = threading.Event()
def _listen():
try:
with sse_client_factory() as sse:
time.sleep(0.3)
for event in sse.events():
if not event.data or not str(event.data).strip():
continue
data = json.loads(event.data)
if data.get("mediaId") != media_id:
continue
collected.append(data)
if (
data.get("mediaStatus") == "AIProcessed"
and data.get("mediaPercent") == 100
):
break
except BaseException as e:
thread_exc.append(e)
finally:
done.set()
th = threading.Thread(target=_listen, daemon=True)
th.start()
time.sleep(0.5)
pr = http_client.post(f"/detect/{media_id}", json=body, headers=headers)
assert pr.status_code == 200
ok = done.wait(timeout=120)
assert ok
th.join(timeout=5)
assert not thread_exc
assert any(
e.get("mediaStatus") == "AIProcessed" and e.get("mediaPercent") == 100
for e in collected
)
def test_nft_res_01_loader_outage_after_init(
warm_engine, http_client, mock_loader_url, image_small
):
requests.post(
f"{mock_loader_url}/mock/config", json={"mode": "error"}, timeout=10
).raise_for_status()
files = {"file": ("r1.jpg", image_small, "image/jpeg")}
r = http_client.post("/detect", files=files, timeout=_DETECT_TIMEOUT)
assert r.status_code == 200
assert isinstance(r.json(), list)
h = http_client.get("/health")
assert h.status_code == 200
hd = h.json()
assert hd["status"] == "healthy"
assert hd.get("errorMessage") is None
@pytest.mark.slow
@pytest.mark.timeout(120)
def test_nft_res_02_annotations_outage_during_async_detection(
warm_engine,
http_client,
jwt_token,
mock_loader_url,
mock_annotations_url,
sse_client_factory,
):
media_id = f"res-n02-{uuid.uuid4().hex}"
body = _ai_config_video(mock_loader_url)
headers = {"Authorization": f"Bearer {jwt_token}"}
collected = []
thread_exc = []
done = threading.Event()
def _listen():
try:
with sse_client_factory() as sse:
time.sleep(0.3)
for event in sse.events():
if not event.data or not str(event.data).strip():
continue
data = json.loads(event.data)
if data.get("mediaId") != media_id:
continue
collected.append(data)
if (
data.get("mediaStatus") == "AIProcessed"
and data.get("mediaPercent") == 100
):
break
except BaseException as e:
thread_exc.append(e)
finally:
done.set()
th = threading.Thread(target=_listen, daemon=True)
th.start()
time.sleep(0.5)
pr = http_client.post(f"/detect/{media_id}", json=body, headers=headers)
assert pr.status_code == 200
requests.post(
f"{mock_annotations_url}/mock/config", json={"mode": "error"}, timeout=10
).raise_for_status()
ok = done.wait(timeout=120)
assert ok
th.join(timeout=5)
assert not thread_exc
assert any(
e.get("mediaStatus") == "AIProcessed" and e.get("mediaPercent") == 100
for e in collected
)
def test_nft_res_03_transient_loader_first_fail(
mock_loader_url, http_client, image_small
):
requests.post(
f"{mock_loader_url}/mock/config", json={"mode": "first_fail"}, timeout=10
).raise_for_status()
files = {"file": ("r3a.jpg", image_small, "image/jpeg")}
r1 = http_client.post("/detect", files=files, timeout=_DETECT_TIMEOUT)
files2 = {"file": ("r3b.jpg", image_small, "image/jpeg")}
r2 = http_client.post("/detect", files=files2, timeout=_DETECT_TIMEOUT)
assert r2.status_code == 200
if r1.status_code != 200:
assert r1.status_code != 500
@pytest.mark.skip(
reason="Requires docker compose restart capability not available in e2e-runner"
)
def test_nft_res_04_service_restart():
pass
+143 -1
View File
@@ -1 +1,143 @@
"""Memory, concurrency, and payload size boundaries under load."""
import json
import re
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pathlib import Path
import pytest
def _video_ai_body(mock_loader_url: str, video_rel: str) -> dict:
base = mock_loader_url.rstrip("/")
name = video_rel.rstrip("/").split("/")[-1]
return {
"probability_threshold": 0.25,
"tracking_intersection_threshold": 0.6,
"altitude": 400,
"focal_length": 24,
"sensor_width": 23.5,
"paths": [f"{base}/load/{name}"],
"frame_period_recognition": 4,
"frame_recognition_seconds": 2,
}
@pytest.mark.slow
@pytest.mark.timeout(120)
def test_ft_n_08_nft_res_lim_02_sse_queue_bounded_best_effort(
warm_engine,
http_client,
jwt_token,
mock_loader_url,
video_short_path,
sse_client_factory,
):
media_id = f"rlim-sse-{uuid.uuid4().hex}"
body = _video_ai_body(mock_loader_url, video_short_path)
headers = {"Authorization": f"Bearer {jwt_token}"}
collected: list[dict] = []
thread_exc: list[BaseException] = []
done = threading.Event()
def _listen():
try:
with sse_client_factory() as sse:
time.sleep(0.3)
for event in sse.events():
if not event.data or not str(event.data).strip():
continue
data = json.loads(event.data)
if data.get("mediaId") != media_id:
continue
collected.append(data)
if (
data.get("mediaStatus") == "AIProcessed"
and data.get("mediaPercent") == 100
):
break
except BaseException as e:
thread_exc.append(e)
finally:
done.set()
th = threading.Thread(target=_listen, daemon=True)
th.start()
time.sleep(0.5)
r = http_client.post(f"/detect/{media_id}", json=body, headers=headers)
assert r.status_code == 200
assert done.wait(timeout=120)
th.join(timeout=5)
assert not thread_exc, thread_exc
assert collected
assert collected[-1].get("mediaStatus") == "AIProcessed"
@pytest.mark.slow
@pytest.mark.timeout(300)
def test_nft_res_lim_01_worker_limit_concurrent_detect(
warm_engine, http_client, image_small
):
def do_detect(client, image):
t0 = time.monotonic()
r = client.post(
"/detect",
files={"file": ("img.jpg", image, "image/jpeg")},
timeout=120,
)
t1 = time.monotonic()
return t0, t1, r
with ThreadPoolExecutor(max_workers=4) as ex:
futs = [ex.submit(do_detect, http_client, image_small) for _ in range(4)]
results = [f.result() for f in futs]
for _, _, r in results:
assert r.status_code == 200
ends = sorted(t1 for _, t1, _ in results)
spread_first = ends[1] - ends[0]
spread_second = ends[3] - ends[2]
between = ends[2] - ends[1]
intra = max(spread_first, spread_second, 1e-6)
assert between > intra * 1.5
@pytest.mark.slow
@pytest.mark.timeout(120)
def test_nft_res_lim_03_max_detections_per_frame(
warm_engine, http_client, image_dense
):
r = http_client.post(
"/detect",
files={"file": ("img.jpg", image_dense, "image/jpeg")},
timeout=120,
)
assert r.status_code == 200
body = r.json()
assert isinstance(body, list)
assert len(body) <= 300
@pytest.mark.slow
def test_nft_res_lim_04_log_file_rotation(warm_engine, http_client, image_small):
http_client.post(
"/detect",
files={"file": ("img.jpg", image_small, "image/jpeg")},
timeout=60,
)
candidates = [
Path(__file__).resolve().parent.parent / "logs",
Path("/app/Logs"),
]
log_dir = next((p for p in candidates if p.is_dir()), None)
if log_dir is None:
pytest.skip("Log directory not accessible from e2e-runner container")
today = datetime.now().strftime("%Y%m%d")
expected = f"log_inference_{today}.txt"
names = {p.name for p in log_dir.iterdir() if p.is_file()}
if expected not in names:
pat = re.compile(r"^log_inference_\d{8}\.txt$")
assert any(pat.match(n) for n in names), names
+190 -1
View File
@@ -1 +1,190 @@
"""Video ingestion, frame sampling, and end-to-end media processing."""
import json
import threading
import time
import uuid
import pytest
def _video_load_url(mock_loader_url: str, video_media_path: str) -> str:
name = video_media_path.rstrip("/").split("/")[-1]
return f"{mock_loader_url.rstrip('/')}/load/{name}"
def _base_ai_body(mock_loader_url: str, video_path: str) -> dict:
return {
"probability_threshold": 0.25,
"frame_period_recognition": 4,
"frame_recognition_seconds": 2,
"tracking_distance_confidence": 0.0,
"tracking_probability_increase": 0.0,
"tracking_intersection_threshold": 0.6,
"altitude": 400.0,
"focal_length": 24.0,
"sensor_width": 23.5,
"paths": [_video_load_url(mock_loader_url, video_path)],
}
def _run_async_video_sse(
http_client,
jwt_token,
sse_client_factory,
media_id: str,
body: dict,
*,
timed: bool = False,
wait_s: float = 120.0,
):
collected: list = []
thread_exc: list[BaseException] = []
done = threading.Event()
def _listen():
try:
with sse_client_factory() as sse:
time.sleep(0.3)
for event in sse.events():
if not event.data or not str(event.data).strip():
continue
data = json.loads(event.data)
if data.get("mediaId") != media_id:
continue
if timed:
collected.append((time.monotonic(), data))
else:
collected.append(data)
if (
data.get("mediaStatus") == "AIProcessed"
and data.get("mediaPercent") == 100
):
break
except BaseException as e:
thread_exc.append(e)
finally:
done.set()
th = threading.Thread(target=_listen, daemon=True)
th.start()
time.sleep(0.5)
r = http_client.post(
f"/detect/{media_id}",
json=body,
headers={"Authorization": f"Bearer {jwt_token}"},
)
assert r.status_code == 200
assert r.json() == {"status": "started", "mediaId": media_id}
assert done.wait(timeout=wait_s)
th.join(timeout=5)
assert not thread_exc, thread_exc
return collected
def _assert_detection_dto(d: dict) -> None:
assert isinstance(d["centerX"], (int, float))
assert isinstance(d["centerY"], (int, float))
assert isinstance(d["width"], (int, float))
assert isinstance(d["height"], (int, float))
assert 0.0 <= float(d["centerX"]) <= 1.0
assert 0.0 <= float(d["centerY"]) <= 1.0
assert 0.0 <= float(d["width"]) <= 1.0
assert 0.0 <= float(d["height"]) <= 1.0
assert isinstance(d["classNum"], int)
assert isinstance(d["label"], str)
assert isinstance(d["confidence"], (int, float))
assert 0.0 <= float(d["confidence"]) <= 1.0
@pytest.mark.slow
@pytest.mark.timeout(120)
def test_ft_p_10_frame_sampling_ac1(
warm_engine,
http_client,
jwt_token,
mock_loader_url,
video_short_path,
sse_client_factory,
):
media_id = f"video-{uuid.uuid4().hex}"
body = _base_ai_body(mock_loader_url, video_short_path)
body["frame_period_recognition"] = 4
collected = _run_async_video_sse(
http_client,
jwt_token,
sse_client_factory,
media_id,
body,
)
processing = [e for e in collected if e.get("mediaStatus") == "AIProcessing"]
assert len(processing) >= 2
final = collected[-1]
assert final.get("mediaStatus") == "AIProcessed"
assert final.get("mediaPercent") == 100
@pytest.mark.slow
@pytest.mark.timeout(120)
def test_ft_p_11_annotation_interval_ac2(
warm_engine,
http_client,
jwt_token,
mock_loader_url,
video_short_path,
sse_client_factory,
):
media_id = f"video-{uuid.uuid4().hex}"
body = _base_ai_body(mock_loader_url, video_short_path)
body["frame_recognition_seconds"] = 2
collected = _run_async_video_sse(
http_client,
jwt_token,
sse_client_factory,
media_id,
body,
timed=True,
)
processing = [
(t, d) for t, d in collected if d.get("mediaStatus") == "AIProcessing"
]
assert len(processing) >= 2
gaps = [
processing[i][0] - processing[i - 1][0]
for i in range(1, len(processing))
]
assert all(g >= 0.0 for g in gaps)
final = collected[-1][1]
assert final.get("mediaStatus") == "AIProcessed"
assert final.get("mediaPercent") == 100
@pytest.mark.slow
@pytest.mark.timeout(120)
def test_ft_p_12_movement_tracking_ac3(
warm_engine,
http_client,
jwt_token,
mock_loader_url,
video_short_path,
sse_client_factory,
):
media_id = f"video-{uuid.uuid4().hex}"
body = _base_ai_body(mock_loader_url, video_short_path)
body["tracking_distance_confidence"] = 0.1
body["tracking_probability_increase"] = 0.1
collected = _run_async_video_sse(
http_client,
jwt_token,
sse_client_factory,
media_id,
body,
)
for e in collected:
anns = e.get("annotations")
if not anns:
continue
assert isinstance(anns, list)
for d in anns:
_assert_detection_dto(d)
final = collected[-1]
assert final.get("mediaStatus") == "AIProcessed"
assert final.get("mediaPercent") == 100