Update health endpoint and refine test documentation

- Modified the health endpoint to return "None" for AI availability when inference is not initialized, improving clarity on system status.
- Enhanced the test documentation to include handling of skipped tests, emphasizing the need for investigation before proceeding.
- Updated test assertions to ensure proper execution order and prevent premature engine initialization.
- Refactored test cases to streamline performance testing and improve readability, removing unnecessary complexity.

These changes aim to enhance the robustness of the health check and improve the overall testing framework.
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-03-30 01:17:53 +03:00
parent 5a968edcba
commit 86b8f076b7
11 changed files with 2130 additions and 307 deletions
+59 -153
View File
@@ -1,78 +1,50 @@
import csv
import base64
import json
import os
import threading
import time
import uuid
import pytest
RESULTS_DIR = os.environ.get("RESULTS_DIR", "/results")
import sseclient
def _base_ai_body(video_path: str) -> dict:
return {
def _make_jwt() -> str:
header = base64.urlsafe_b64encode(
json.dumps({"alg": "none", "typ": "JWT"}).encode()
).decode().rstrip("=")
raw = json.dumps(
{"exp": int(time.time()) + 3600, "sub": "test"}, separators=(",", ":")
).encode()
payload = base64.urlsafe_b64encode(raw).decode().rstrip("=")
return f"{header}.{payload}.signature"
@pytest.fixture(scope="module")
def video_events(warm_engine, http_client, video_short_path):
media_id = f"video-{uuid.uuid4().hex}"
body = {
"probability_threshold": 0.25,
"frame_period_recognition": 4,
"frame_recognition_seconds": 2,
"tracking_distance_confidence": 0.0,
"tracking_probability_increase": 0.0,
"tracking_distance_confidence": 0.1,
"tracking_probability_increase": 0.1,
"tracking_intersection_threshold": 0.6,
"altitude": 400.0,
"focal_length": 24.0,
"sensor_width": 23.5,
"paths": [video_path],
"paths": [video_short_path],
}
token = _make_jwt()
def _save_events_csv(video_path: str, events: list[dict]):
stem = os.path.splitext(os.path.basename(video_path))[0]
path = os.path.join(RESULTS_DIR, f"{stem}_detections.csv")
rows = []
for ev in events:
base = {
"mediaId": ev.get("mediaId", ""),
"mediaStatus": ev.get("mediaStatus", ""),
"mediaPercent": ev.get("mediaPercent", ""),
}
anns = ev.get("annotations") or []
if anns:
for det in anns:
rows.append({**base, **det})
else:
rows.append(base)
if not rows:
return
fieldnames = list(rows[0].keys())
for r in rows[1:]:
for k in r:
if k not in fieldnames:
fieldnames.append(k)
with open(path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)
def _run_async_video_sse(
http_client,
jwt_token,
sse_client_factory,
media_id: str,
body: dict,
*,
timed: bool = False,
wait_s: float = 900.0,
):
video_path = (body.get("paths") or [""])[0]
collected: list = []
raw_events: list[dict] = []
collected: list[tuple[float, dict]] = []
thread_exc: list[BaseException] = []
done = threading.Event()
def _listen():
try:
with sse_client_factory() as sse:
with http_client.get("/detect/stream", stream=True, timeout=600) as resp:
resp.raise_for_status()
sse = sseclient.SSEClient(resp)
time.sleep(0.3)
for event in sse.events():
if not event.data or not str(event.data).strip():
@@ -80,11 +52,7 @@ def _run_async_video_sse(
data = json.loads(event.data)
if data.get("mediaId") != media_id:
continue
raw_events.append(data)
if timed:
collected.append((time.monotonic(), data))
else:
collected.append(data)
collected.append((time.monotonic(), data))
if (
data.get("mediaStatus") == "AIProcessed"
and data.get("mediaPercent") == 100
@@ -93,11 +61,6 @@ def _run_async_video_sse(
except BaseException as e:
thread_exc.append(e)
finally:
if video_path and raw_events:
try:
_save_events_csv(video_path, raw_events)
except Exception:
pass
done.set()
th = threading.Thread(target=_listen, daemon=True)
@@ -106,121 +69,64 @@ def _run_async_video_sse(
r = http_client.post(
f"/detect/{media_id}",
json=body,
headers={"Authorization": f"Bearer {jwt_token}"},
headers={"Authorization": f"Bearer {token}"},
)
assert r.status_code == 200
assert r.json() == {"status": "started", "mediaId": media_id}
assert done.wait(timeout=wait_s)
assert done.wait(timeout=900)
th.join(timeout=5)
assert not thread_exc, thread_exc
return collected
def _assert_detection_dto(d: dict) -> None:
assert isinstance(d["centerX"], (int, float))
assert isinstance(d["centerY"], (int, float))
assert isinstance(d["width"], (int, float))
assert isinstance(d["height"], (int, float))
assert 0.0 <= float(d["centerX"]) <= 1.0
assert 0.0 <= float(d["centerY"]) <= 1.0
assert 0.0 <= float(d["width"]) <= 1.0
assert 0.0 <= float(d["height"]) <= 1.0
assert isinstance(d["classNum"], int)
assert isinstance(d["label"], str)
assert isinstance(d["confidence"], (int, float))
assert 0.0 <= float(d["confidence"]) <= 1.0
@pytest.mark.skip(reason="Single video run — covered by test_ft_p09_sse_event_delivery")
@pytest.mark.slow
@pytest.mark.timeout(900)
def test_ft_p_10_frame_sampling_ac1(
warm_engine,
http_client,
jwt_token,
video_short_path,
sse_client_factory,
):
media_id = f"video-{uuid.uuid4().hex}"
body = _base_ai_body(video_short_path)
body["frame_period_recognition"] = 4
collected = _run_async_video_sse(
http_client,
jwt_token,
sse_client_factory,
media_id,
body,
)
processing = [e for e in collected if e.get("mediaStatus") == "AIProcessing"]
def test_ft_p_10_frame_sampling_ac1(video_events):
# Assert
processing = [d for _, d in video_events if d.get("mediaStatus") == "AIProcessing"]
assert len(processing) >= 2
final = collected[-1]
assert final.get("mediaStatus") == "AIProcessed"
assert final.get("mediaPercent") == 100
final = video_events[-1][1]
assert final["mediaStatus"] == "AIProcessed"
assert final["mediaPercent"] == 100
@pytest.mark.skip(reason="Single video run — covered by test_ft_p09_sse_event_delivery")
@pytest.mark.slow
@pytest.mark.timeout(900)
def test_ft_p_11_annotation_interval_ac2(
warm_engine,
http_client,
jwt_token,
video_short_path,
sse_client_factory,
):
media_id = f"video-{uuid.uuid4().hex}"
body = _base_ai_body(video_short_path)
body["frame_recognition_seconds"] = 2
collected = _run_async_video_sse(
http_client,
jwt_token,
sse_client_factory,
media_id,
body,
timed=True,
)
def test_ft_p_11_annotation_interval_ac2(video_events):
# Assert
processing = [
(t, d) for t, d in collected if d.get("mediaStatus") == "AIProcessing"
(t, d) for t, d in video_events if d.get("mediaStatus") == "AIProcessing"
]
assert len(processing) >= 2
gaps = [
processing[i][0] - processing[i - 1][0]
for i in range(1, len(processing))
]
gaps = [processing[i][0] - processing[i - 1][0] for i in range(1, len(processing))]
assert all(g >= 0.0 for g in gaps)
final = collected[-1][1]
assert final.get("mediaStatus") == "AIProcessed"
assert final.get("mediaPercent") == 100
final = video_events[-1][1]
assert final["mediaStatus"] == "AIProcessed"
assert final["mediaPercent"] == 100
@pytest.mark.skip(reason="Single video run — covered by test_ft_p09_sse_event_delivery")
@pytest.mark.slow
@pytest.mark.timeout(900)
def test_ft_p_12_movement_tracking_ac3(
warm_engine,
http_client,
jwt_token,
video_short_path,
sse_client_factory,
):
media_id = f"video-{uuid.uuid4().hex}"
body = _base_ai_body(video_short_path)
body["tracking_distance_confidence"] = 0.1
body["tracking_probability_increase"] = 0.1
collected = _run_async_video_sse(
http_client,
jwt_token,
sse_client_factory,
media_id,
body,
)
for e in collected:
def test_ft_p_12_movement_tracking_ac3(video_events):
# Assert
for _, e in video_events:
anns = e.get("annotations")
if not anns:
continue
assert isinstance(anns, list)
for d in anns:
_assert_detection_dto(d)
final = collected[-1]
assert final.get("mediaStatus") == "AIProcessed"
assert final.get("mediaPercent") == 100
assert isinstance(d["centerX"], (int, float))
assert isinstance(d["centerY"], (int, float))
assert isinstance(d["width"], (int, float))
assert isinstance(d["height"], (int, float))
assert 0.0 <= float(d["centerX"]) <= 1.0
assert 0.0 <= float(d["centerY"]) <= 1.0
assert 0.0 <= float(d["width"]) <= 1.0
assert 0.0 <= float(d["height"]) <= 1.0
assert isinstance(d["classNum"], int)
assert isinstance(d["label"], str)
assert isinstance(d["confidence"], (int, float))
assert 0.0 <= float(d["confidence"]) <= 1.0
final = video_events[-1][1]
assert final["mediaStatus"] == "AIProcessed"
assert final["mediaPercent"] == 100