Files
detections/e2e/tests/test_performance.py
T
2026-03-23 22:28:59 +02:00

176 lines
5.1 KiB
Python

import json
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
import pytest
def _percentile_ms(sorted_ms, p):
n = len(sorted_ms)
if n == 0:
return 0.0
if n == 1:
return float(sorted_ms[0])
k = (n - 1) * (p / 100.0)
lo = int(k)
hi = min(lo + 1, n - 1)
w = k - lo
return sorted_ms[lo] * (1 - w) + sorted_ms[hi] * w
@pytest.mark.slow
@pytest.mark.timeout(300)
def test_nft_perf_01_single_image_latency_p95(
warm_engine, http_client, image_small
):
times_ms = []
for _ in range(10):
t0 = time.perf_counter()
r = http_client.post(
"/detect",
files={"file": ("img.jpg", image_small, "image/jpeg")},
timeout=120,
)
elapsed_ms = (time.perf_counter() - t0) * 1000.0
assert r.status_code == 200
times_ms.append(elapsed_ms)
sorted_ms = sorted(times_ms)
p50 = _percentile_ms(sorted_ms, 50)
p95 = _percentile_ms(sorted_ms, 95)
p99 = _percentile_ms(sorted_ms, 99)
print(
"nft_perf_01_csv,run_ms,"
+ ",".join(f"{x:.2f}" for x in sorted_ms)
+ f",p50,{p50:.2f},p95,{p95:.2f},p99,{p99:.2f}"
)
assert p95 < 5000.0
def _post_small(http_client, image_small):
return http_client.post(
"/detect",
files={"file": ("img.jpg", image_small, "image/jpeg")},
timeout=120,
)
@pytest.mark.slow
@pytest.mark.timeout(300)
def test_nft_perf_02_concurrent_throughput_queuing(
warm_engine, http_client, image_small
):
def run_two():
t0 = time.monotonic()
with ThreadPoolExecutor(max_workers=2) as ex:
futs = [ex.submit(_post_small, http_client, image_small) for _ in range(2)]
rs = [f.result() for f in futs]
return time.monotonic() - t0, rs
def run_three():
t0 = time.monotonic()
with ThreadPoolExecutor(max_workers=3) as ex:
futs = [ex.submit(_post_small, http_client, image_small) for _ in range(3)]
rs = [f.result() for f in futs]
return time.monotonic() - t0, rs
wall2, rs2 = run_two()
assert all(r.status_code == 200 for r in rs2)
wall3, rs3 = run_three()
assert all(r.status_code == 200 for r in rs3)
if wall2 < 4.0:
pytest.skip("wall clock too small for queuing comparison")
assert wall3 > wall2 + 0.25
@pytest.mark.slow
@pytest.mark.timeout(300)
def test_nft_perf_03_tiling_overhead_large_image(
warm_engine, http_client, image_small, image_large
):
t_small = time.perf_counter()
r_small = http_client.post(
"/detect",
files={"file": ("small.jpg", image_small, "image/jpeg")},
timeout=120,
)
small_ms = (time.perf_counter() - t_small) * 1000.0
assert r_small.status_code == 200
config = json.dumps(
{"altitude": 400, "focal_length": 24, "sensor_width": 23.5}
)
t_large = time.perf_counter()
r_large = http_client.post(
"/detect",
files={"file": ("large.jpg", image_large, "image/jpeg")},
data={"config": config},
timeout=120,
)
large_ms = (time.perf_counter() - t_large) * 1000.0
assert r_large.status_code == 200
assert large_ms < 120_000.0
print(
f"nft_perf_03_csv,baseline_small_ms,{small_ms:.2f},large_ms,{large_ms:.2f}"
)
assert large_ms > small_ms - 500.0
@pytest.mark.slow
@pytest.mark.timeout(300)
def test_nft_perf_04_video_frame_rate_sse(
warm_engine,
http_client,
jwt_token,
mock_loader_url,
sse_client_factory,
):
media_id = f"perf-sse-{uuid.uuid4().hex}"
base = mock_loader_url.rstrip("/")
body = {
"probability_threshold": 0.25,
"paths": [f"{base}/load/video_short01.mp4"],
"frame_period_recognition": 4,
"frame_recognition_seconds": 2,
}
headers = {"Authorization": f"Bearer {jwt_token}"}
stamps = []
thread_exc = []
done = threading.Event()
def _listen():
try:
with sse_client_factory() as sse:
time.sleep(0.3)
for event in sse.events():
if not event.data or not str(event.data).strip():
continue
data = json.loads(event.data)
if data.get("mediaId") != media_id:
continue
stamps.append(time.monotonic())
if (
data.get("mediaStatus") == "AIProcessed"
and data.get("mediaPercent") == 100
):
break
except BaseException as e:
thread_exc.append(e)
finally:
done.set()
th = threading.Thread(target=_listen, daemon=True)
th.start()
time.sleep(0.5)
r = http_client.post(f"/detect/{media_id}", json=body, headers=headers)
assert r.status_code == 200
ok = done.wait(timeout=120)
assert ok
th.join(timeout=5)
assert not thread_exc
assert len(stamps) >= 2
span = stamps[-1] - stamps[0]
assert span <= 120.0
gaps = [stamps[i + 1] - stamps[i] for i in range(len(stamps) - 1)]
assert max(gaps) <= 30.0