mirror of
https://github.com/azaion/detections.git
synced 2026-04-22 21:56:33 +00:00
8baa96978b
- Updated the detection image endpoint to require a channel ID for event streaming. - Introduced a new endpoint for streaming detection events, allowing clients to receive real-time updates. - Enhanced the internal buffering mechanism for detection events to manage multiple channels. - Refactored the inference module to support the new event handling structure. Made-with: Cursor
33 lines
994 B
Python
33 lines
994 B
Python
import pytest
|
|
import requests
|
|
|
|
_DETECT_TIMEOUT = 60
|
|
|
|
|
|
def test_nft_res_01_loader_outage_after_init(
|
|
warm_engine, image_detect, mock_loader_url, image_small, http_client
|
|
):
|
|
requests.post(
|
|
f"{mock_loader_url}/mock/config", json={"mode": "error"}, timeout=10
|
|
).raise_for_status()
|
|
detections, _ = image_detect(image_small, "r1.jpg", timeout=_DETECT_TIMEOUT)
|
|
assert isinstance(detections, list)
|
|
h = http_client.get("/health")
|
|
assert h.status_code == 200
|
|
hd = h.json()
|
|
assert hd["status"] == "healthy"
|
|
assert hd.get("errorMessage") is None
|
|
|
|
|
|
def test_nft_res_03_transient_loader_first_fail(
|
|
mock_loader_url, image_detect, image_small
|
|
):
|
|
requests.post(
|
|
f"{mock_loader_url}/mock/config", json={"mode": "first_fail"}, timeout=10
|
|
).raise_for_status()
|
|
try:
|
|
image_detect(image_small, "r3a.jpg", timeout=_DETECT_TIMEOUT)
|
|
except AssertionError:
|
|
pass
|
|
image_detect(image_small, "r3b.jpg", timeout=_DETECT_TIMEOUT)
|