mirror of
https://github.com/azaion/detections.git
synced 2026-04-22 22:16:31 +00:00
86b8f076b7
- Modified the health endpoint to return "None" for AI availability when inference is not initialized, improving clarity on system status. - Enhanced the test documentation to include handling of skipped tests, emphasizing the need for investigation before proceeding. - Updated test assertions to ensure proper execution order and prevent premature engine initialization. - Refactored test cases to streamline performance testing and improve readability, removing unnecessary complexity. These changes aim to enhance the robustness of the health check and improve the overall testing framework.
133 lines
4.3 KiB
Python
133 lines
4.3 KiB
Python
import base64
|
|
import json
|
|
import threading
|
|
import time
|
|
import uuid
|
|
|
|
import pytest
|
|
import sseclient
|
|
|
|
|
|
def _make_jwt() -> str:
|
|
header = base64.urlsafe_b64encode(
|
|
json.dumps({"alg": "none", "typ": "JWT"}).encode()
|
|
).decode().rstrip("=")
|
|
raw = json.dumps(
|
|
{"exp": int(time.time()) + 3600, "sub": "test"}, separators=(",", ":")
|
|
).encode()
|
|
payload = base64.urlsafe_b64encode(raw).decode().rstrip("=")
|
|
return f"{header}.{payload}.signature"
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def video_events(warm_engine, http_client, video_short_path):
|
|
media_id = f"video-{uuid.uuid4().hex}"
|
|
body = {
|
|
"probability_threshold": 0.25,
|
|
"frame_period_recognition": 4,
|
|
"frame_recognition_seconds": 2,
|
|
"tracking_distance_confidence": 0.1,
|
|
"tracking_probability_increase": 0.1,
|
|
"tracking_intersection_threshold": 0.6,
|
|
"altitude": 400.0,
|
|
"focal_length": 24.0,
|
|
"sensor_width": 23.5,
|
|
"paths": [video_short_path],
|
|
}
|
|
token = _make_jwt()
|
|
|
|
collected: list[tuple[float, dict]] = []
|
|
thread_exc: list[BaseException] = []
|
|
done = threading.Event()
|
|
|
|
def _listen():
|
|
try:
|
|
with http_client.get("/detect/stream", stream=True, timeout=600) as resp:
|
|
resp.raise_for_status()
|
|
sse = sseclient.SSEClient(resp)
|
|
time.sleep(0.3)
|
|
for event in sse.events():
|
|
if not event.data or not str(event.data).strip():
|
|
continue
|
|
data = json.loads(event.data)
|
|
if data.get("mediaId") != media_id:
|
|
continue
|
|
collected.append((time.monotonic(), data))
|
|
if (
|
|
data.get("mediaStatus") == "AIProcessed"
|
|
and data.get("mediaPercent") == 100
|
|
):
|
|
break
|
|
except BaseException as e:
|
|
thread_exc.append(e)
|
|
finally:
|
|
done.set()
|
|
|
|
th = threading.Thread(target=_listen, daemon=True)
|
|
th.start()
|
|
time.sleep(0.5)
|
|
r = http_client.post(
|
|
f"/detect/{media_id}",
|
|
json=body,
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
assert r.status_code == 200
|
|
assert r.json() == {"status": "started", "mediaId": media_id}
|
|
assert done.wait(timeout=900)
|
|
th.join(timeout=5)
|
|
assert not thread_exc, thread_exc
|
|
return collected
|
|
|
|
|
|
@pytest.mark.slow
|
|
@pytest.mark.timeout(900)
|
|
def test_ft_p_10_frame_sampling_ac1(video_events):
|
|
# Assert
|
|
processing = [d for _, d in video_events if d.get("mediaStatus") == "AIProcessing"]
|
|
assert len(processing) >= 2
|
|
final = video_events[-1][1]
|
|
assert final["mediaStatus"] == "AIProcessed"
|
|
assert final["mediaPercent"] == 100
|
|
|
|
|
|
@pytest.mark.slow
|
|
@pytest.mark.timeout(900)
|
|
def test_ft_p_11_annotation_interval_ac2(video_events):
|
|
# Assert
|
|
processing = [
|
|
(t, d) for t, d in video_events if d.get("mediaStatus") == "AIProcessing"
|
|
]
|
|
assert len(processing) >= 2
|
|
gaps = [processing[i][0] - processing[i - 1][0] for i in range(1, len(processing))]
|
|
assert all(g >= 0.0 for g in gaps)
|
|
final = video_events[-1][1]
|
|
assert final["mediaStatus"] == "AIProcessed"
|
|
assert final["mediaPercent"] == 100
|
|
|
|
|
|
@pytest.mark.slow
|
|
@pytest.mark.timeout(900)
|
|
def test_ft_p_12_movement_tracking_ac3(video_events):
|
|
# Assert
|
|
for _, e in video_events:
|
|
anns = e.get("annotations")
|
|
if not anns:
|
|
continue
|
|
assert isinstance(anns, list)
|
|
for d in anns:
|
|
assert isinstance(d["centerX"], (int, float))
|
|
assert isinstance(d["centerY"], (int, float))
|
|
assert isinstance(d["width"], (int, float))
|
|
assert isinstance(d["height"], (int, float))
|
|
assert 0.0 <= float(d["centerX"]) <= 1.0
|
|
assert 0.0 <= float(d["centerY"]) <= 1.0
|
|
assert 0.0 <= float(d["width"]) <= 1.0
|
|
assert 0.0 <= float(d["height"]) <= 1.0
|
|
assert isinstance(d["classNum"], int)
|
|
assert isinstance(d["label"], str)
|
|
assert isinstance(d["confidence"], (int, float))
|
|
assert 0.0 <= float(d["confidence"]) <= 1.0
|
|
final = video_events[-1][1]
|
|
assert final["mediaStatus"] == "AIProcessed"
|
|
assert final["mediaPercent"] == 100
|