"""AZ-701 — replay_api unit tests. Covers the AC matrix without invoking the real `gps-denied-replay` subprocess. A fake `ReplayRunner` writes deterministic emissions into the per-job output dir; everything downstream (job state, HTTP handlers, magic-byte validation, auth, concurrency) is then exercised against real FastAPI routing via `httpx.AsyncClient`. FastAPI / uvicorn / python-multipart are operator-only deps — the whole module skips cleanly when any is missing. Style: every test follows Arrange / Act / Assert. """ from __future__ import annotations import json import os import threading import time from collections.abc import Iterator from pathlib import Path from typing import Any import pytest fastapi = pytest.importorskip( "fastapi", reason="FastAPI is an operator-only dep; install gps-denied-onboard[operator-tools]", ) pytest.importorskip("httpx", reason="httpx required for the FastAPI TestClient") pytest.importorskip("multipart", reason="python-multipart required by FastAPI") from fastapi.testclient import TestClient from gps_denied_onboard.replay_api import ( JobState, create_app, ) from gps_denied_onboard.replay_api.handlers import ( validate_tlog_kind, validate_video_kind, ) from gps_denied_onboard.replay_api.interface import ( ReplayInputs, ReplayJobResult, ) from gps_denied_onboard.replay_api.jobs import JobRegistry from gps_denied_onboard.replay_api.storage import StorageRoot # --------------------------------------------------------------------- # Fixtures + fakes class _FakeRunner: """Deterministic runner that writes a single emissions row.""" def __init__(self, *, delay_s: float = 0.0, fail: bool = False) -> None: self.delay_s = delay_s self.fail = fail self.calls: list[ReplayInputs] = [] def run(self, inputs: ReplayInputs, *, output_dir: Path) -> ReplayJobResult: self.calls.append(inputs) if self.delay_s: time.sleep(self.delay_s) if self.fail: raise RuntimeError("fake runner forced failure") emissions = output_dir / "emissions.jsonl" emissions.write_text( json.dumps( { "frame_id": 0, "position_wgs84": { "lat_deg": 50.0, "lon_deg": 30.0, "alt_m": 100.0, }, "emitted_at": 0, } ) + "\n" ) report = output_dir / "accuracy_report.md" report.write_text("# Fake report\n\n**Verdict**: PASS\n") map_html = output_dir / "map.html" map_html.write_text("fake map") return ReplayJobResult( emissions_jsonl_path=emissions, accuracy_report_md_path=report, map_html_path=map_html, ) def _valid_tlog_bytes() -> bytes: """First 8 bytes are a microsecond timestamp; byte 8 = MAVLink magic.""" return b"\x00\x00\x00\x00\x00\x00\x00\x00\xfd" + b"\x00" * 32 def _valid_mp4_bytes() -> bytes: """ISO mp4: any size prefix + 'ftyp' marker at offset 4.""" return b"\x00\x00\x00\x20ftypisom\x00\x00\x02\x00mp42" + b"\x00" * 16 def _valid_calibration_bytes() -> bytes: return b'{"focal_length": 1, "acquisition_method": "factory-sheet"}' @pytest.fixture(autouse=True) def _disable_auth_by_default(monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: monkeypatch.setenv("REPLAY_API_AUTH_REQUIRED", "false") monkeypatch.delenv("REPLAY_API_BEARER_TOKEN", raising=False) yield @pytest.fixture def storage(tmp_path: Path) -> StorageRoot: return StorageRoot(tmp_path / "replay_api") @pytest.fixture def fake_runner() -> _FakeRunner: return _FakeRunner() @pytest.fixture def make_app( storage: StorageRoot, ) -> Any: def _factory( runner: Any, *, max_concurrent: int = 1, max_queued: int = 8, sync_max_bytes: int = 10_000_000, ) -> Any: registry = JobRegistry( runner=runner, storage=storage, max_concurrent=max_concurrent, max_queued=max_queued, ) return create_app( runner=runner, storage=storage, registry=registry, sync_max_bytes=sync_max_bytes, ) return _factory # --------------------------------------------------------------------- # Magic-byte validation (AC-9) def test_validate_tlog_kind_accepts_mavlink_v2_magic() -> None: # Act / Assert — must not raise validate_tlog_kind(_valid_tlog_bytes()) def test_validate_tlog_kind_rejects_zip_renamed_to_tlog() -> None: # Arrange — ZIP magic bytes at offset 0; pre-bytes 0..7 are # the (forged) timestamp; byte 8 holds the (non-MAVLink) magic. bogus = b"\x00\x00\x00\x00\x00\x00\x00\x00PK\x03\x04rest_of_zip_header" # Act / Assert with pytest.raises(Exception) as exc: validate_tlog_kind(bogus) assert "MAVLink" in str(exc.value) def test_validate_video_kind_accepts_mp4_ftyp() -> None: validate_video_kind(_valid_mp4_bytes()) def test_validate_video_kind_rejects_arbitrary_bytes() -> None: with pytest.raises(Exception) as exc: validate_video_kind(b"\x00" * 64) assert "ftyp" in str(exc.value) # --------------------------------------------------------------------- # AC-1 — sync POST → 200 + JSONL def test_post_replay_sync_returns_200_with_result_urls( fake_runner: _FakeRunner, make_app: Any, ) -> None: # Arrange app = make_app(fake_runner) client = TestClient(app) # Act response = client.post( "/replay", files={ "tlog": ("derkachi.tlog", _valid_tlog_bytes(), "application/octet-stream"), "video": ("derkachi.mp4", _valid_mp4_bytes(), "video/mp4"), "calibration": ( "khp20s30.json", _valid_calibration_bytes(), "application/json", ), }, data={"pace": "asap"}, ) # Assert assert response.status_code == 200, response.text body = response.json() assert body["state"] == JobState.DONE.value assert body["sync"] is True assert body["emissions_jsonl_url"].endswith("/result") assert body["map_html_url"].endswith("/map") assert body["accuracy_report_md_url"].endswith("/report") # Runner saw exactly one job with the expected pace + auto-trim default. assert len(fake_runner.calls) == 1 assert fake_runner.calls[0].pace == "asap" assert fake_runner.calls[0].auto_trim is True def test_post_replay_serves_jsonl_and_map_for_done_job( fake_runner: _FakeRunner, make_app: Any, ) -> None: # Arrange app = make_app(fake_runner) client = TestClient(app) response = client.post( "/replay", files={ "tlog": ("derkachi.tlog", _valid_tlog_bytes(), "application/octet-stream"), "video": ("derkachi.mp4", _valid_mp4_bytes(), "video/mp4"), "calibration": ("k.json", _valid_calibration_bytes(), "application/json"), }, ) body = response.json() job_id = body["job_id"] # Act jsonl_resp = client.get(f"/jobs/{job_id}/result") map_resp = client.get(f"/jobs/{job_id}/map") report_resp = client.get(f"/jobs/{job_id}/report") # Assert assert jsonl_resp.status_code == 200 assert "lat_deg" in jsonl_resp.text assert map_resp.status_code == 200 assert "fake map" in map_resp.text assert report_resp.status_code == 200 assert "**Verdict**: PASS" in report_resp.text # --------------------------------------------------------------------- # AC-2 — async POST → 202 + job id def test_post_replay_async_returns_202_when_video_exceeds_sync_bytes( storage: StorageRoot, ) -> None: # Arrange — runner sleeps so we observe the queued/running state. runner = _FakeRunner(delay_s=0.2) registry = JobRegistry(runner=runner, storage=storage, max_concurrent=1) app = create_app( runner=runner, storage=storage, registry=registry, sync_max_bytes=10, # any non-trivial video exceeds this ) client = TestClient(app) # Act response = client.post( "/replay", files={ "tlog": ("d.tlog", _valid_tlog_bytes(), "application/octet-stream"), "video": ("d.mp4", _valid_mp4_bytes(), "video/mp4"), "calibration": ( "k.json", _valid_calibration_bytes(), "application/json", ), }, ) # Assert assert response.status_code == 202, response.text body = response.json() assert body["state"] in {JobState.QUEUED.value, JobState.RUNNING.value} assert "Location" in response.headers assert response.headers["Location"] == f"/jobs/{body['job_id']}" _wait_done(client, body["job_id"]) # --------------------------------------------------------------------- # AC-3 — job state transitions queued → running → done def test_job_state_transitions_observable_via_polling( storage: StorageRoot, ) -> None: # Arrange runner = _FakeRunner(delay_s=0.3) registry = JobRegistry(runner=runner, storage=storage, max_concurrent=1) app = create_app( runner=runner, storage=storage, registry=registry, sync_max_bytes=10, ) client = TestClient(app) response = client.post( "/replay", files={ "tlog": ("d.tlog", _valid_tlog_bytes(), "application/octet-stream"), "video": ("d.mp4", _valid_mp4_bytes(), "video/mp4"), "calibration": ("k.json", _valid_calibration_bytes(), "application/json"), }, ) job_id = response.json()["job_id"] # Act + Assert — poll until done; record the unique states seen. seen: set[str] = set() deadline = time.monotonic() + 10.0 while time.monotonic() < deadline: snap = client.get(f"/jobs/{job_id}").json() seen.add(snap["state"]) if snap["state"] == JobState.DONE.value: break time.sleep(0.05) assert JobState.DONE.value in seen # We expect to have seen at least one of queued/running before done. assert seen & {JobState.QUEUED.value, JobState.RUNNING.value} def test_failed_runner_marks_job_failed( storage: StorageRoot, ) -> None: # Arrange runner = _FakeRunner(fail=True) registry = JobRegistry(runner=runner, storage=storage) app = create_app( runner=runner, storage=storage, registry=registry, sync_max_bytes=10 ) client = TestClient(app) # Act response = client.post( "/replay", files={ "tlog": ("d.tlog", _valid_tlog_bytes(), "application/octet-stream"), "video": ("d.mp4", _valid_mp4_bytes(), "video/mp4"), "calibration": ("k.json", _valid_calibration_bytes(), "application/json"), }, ) job_id = response.json()["job_id"] snap = _wait_terminal(client, job_id) # Assert assert snap["state"] == JobState.FAILED.value assert "fake runner forced failure" in (snap["error"] or "") # --------------------------------------------------------------------- # AC-4 — result + map served from job id (covered above) def test_result_endpoints_409_when_job_not_done( storage: StorageRoot, ) -> None: # Arrange — slow runner so job stays running long enough to probe. runner = _FakeRunner(delay_s=0.5) registry = JobRegistry(runner=runner, storage=storage) app = create_app( runner=runner, storage=storage, registry=registry, sync_max_bytes=10 ) client = TestClient(app) job_id = client.post( "/replay", files={ "tlog": ("d.tlog", _valid_tlog_bytes(), "application/octet-stream"), "video": ("d.mp4", _valid_mp4_bytes(), "video/mp4"), "calibration": ("k.json", _valid_calibration_bytes(), "application/json"), }, ).json()["job_id"] # Act — race the runner; we want to hit the not-done branch. res = client.get(f"/jobs/{job_id}/result") # Assert if res.status_code == 200: pytest.skip("runner finished before we could probe the 409 path") assert res.status_code == 409 body = res.json() assert body["error_code"] == "job_not_complete" _wait_done(client, job_id) # --------------------------------------------------------------------- # AC-5 — auth enforced when configured def test_post_replay_returns_401_without_bearer_when_required( monkeypatch: pytest.MonkeyPatch, storage: StorageRoot, fake_runner: _FakeRunner, ) -> None: # Arrange monkeypatch.setenv("REPLAY_API_AUTH_REQUIRED", "true") monkeypatch.setenv("REPLAY_API_BEARER_TOKEN", "shibboleth") registry = JobRegistry(runner=fake_runner, storage=storage) app = create_app( runner=fake_runner, storage=storage, registry=registry, sync_max_bytes=10_000_000, ) client = TestClient(app) # Act response = client.post( "/replay", files={ "tlog": ("d.tlog", _valid_tlog_bytes(), "application/octet-stream"), "video": ("d.mp4", _valid_mp4_bytes(), "video/mp4"), "calibration": ("k.json", _valid_calibration_bytes(), "application/json"), }, ) # Assert assert response.status_code == 401 assert response.json()["error_code"] == "unauthorized" def test_post_replay_accepts_correct_bearer( monkeypatch: pytest.MonkeyPatch, storage: StorageRoot, fake_runner: _FakeRunner, ) -> None: # Arrange monkeypatch.setenv("REPLAY_API_AUTH_REQUIRED", "true") monkeypatch.setenv("REPLAY_API_BEARER_TOKEN", "shibboleth") registry = JobRegistry(runner=fake_runner, storage=storage) app = create_app( runner=fake_runner, storage=storage, registry=registry, sync_max_bytes=10_000_000, ) client = TestClient(app) # Act response = client.post( "/replay", files={ "tlog": ("d.tlog", _valid_tlog_bytes(), "application/octet-stream"), "video": ("d.mp4", _valid_mp4_bytes(), "video/mp4"), "calibration": ("k.json", _valid_calibration_bytes(), "application/json"), }, headers={"Authorization": "Bearer shibboleth"}, ) # Assert assert response.status_code == 200, response.text # --------------------------------------------------------------------- # AC-6 — health endpoints def test_healthz_always_returns_200(fake_runner: _FakeRunner, make_app: Any) -> None: # Arrange client = TestClient(make_app(fake_runner)) # Act / Assert assert client.get("/healthz").status_code == 200 def test_readyz_returns_503_when_binary_missing( monkeypatch: pytest.MonkeyPatch, fake_runner: _FakeRunner, make_app: Any, ) -> None: # Arrange — point readyz at a binary we know doesn't exist. monkeypatch.setenv("REPLAY_API_REPLAY_BINARY", "definitely-not-a-binary-az701") client = TestClient(make_app(fake_runner)) # Act response = client.get("/readyz") # Assert assert response.status_code == 503 assert "not on PATH" in response.json()["reason"] # --------------------------------------------------------------------- # AC-8 — concurrency limit enforced def test_concurrency_limit_queues_excess_jobs(storage: StorageRoot) -> None: # Arrange runner = _FakeRunner(delay_s=0.5) registry = JobRegistry( runner=runner, storage=storage, max_concurrent=1, max_queued=8 ) app = create_app( runner=runner, storage=storage, registry=registry, sync_max_bytes=10 ) client = TestClient(app) job_ids: list[str] = [] # Act — submit 3 in quick succession; sync_max_bytes=10 forces async mode. for _ in range(3): resp = client.post( "/replay", files={ "tlog": ("d.tlog", _valid_tlog_bytes(), "application/octet-stream"), "video": ("d.mp4", _valid_mp4_bytes(), "video/mp4"), "calibration": ( "k.json", _valid_calibration_bytes(), "application/json", ), }, ) assert resp.status_code == 202, resp.text job_ids.append(resp.json()["job_id"]) # Sample states quickly — at this instant we expect 1 running and ≥ 1 queued. states = [ client.get(f"/jobs/{jid}").json()["state"] for jid in job_ids ] assert states.count(JobState.RUNNING.value) <= 1, ( f"more than one running at once: {states}" ) assert ( states.count(JobState.QUEUED.value) >= 1 or states.count(JobState.DONE.value) >= 2 ), f"no queued state observed; states={states}" # Wait for everything to finish so the test exits cleanly. for jid in job_ids: _wait_done(client, jid) def test_queue_full_returns_429(storage: StorageRoot) -> None: # Arrange — max_queued=0 forces the 429 path on the second submit. runner = _FakeRunner(delay_s=0.5) registry = JobRegistry( runner=runner, storage=storage, max_concurrent=1, max_queued=0 ) app = create_app( runner=runner, storage=storage, registry=registry, sync_max_bytes=10 ) client = TestClient(app) # Act first = client.post( "/replay", files={ "tlog": ("d.tlog", _valid_tlog_bytes(), "application/octet-stream"), "video": ("d.mp4", _valid_mp4_bytes(), "video/mp4"), "calibration": ("k.json", _valid_calibration_bytes(), "application/json"), }, ) second = client.post( "/replay", files={ "tlog": ("d.tlog", _valid_tlog_bytes(), "application/octet-stream"), "video": ("d.mp4", _valid_mp4_bytes(), "video/mp4"), "calibration": ("k.json", _valid_calibration_bytes(), "application/json"), }, ) # Assert assert first.status_code == 202 assert second.status_code == 429 assert second.json()["error_code"] == "concurrency_limit_reached" _wait_done(client, first.json()["job_id"]) # --------------------------------------------------------------------- # AC-9 — magic-byte upload validation (HTTP path) def test_post_replay_rejects_misnamed_zip_as_tlog( fake_runner: _FakeRunner, make_app: Any ) -> None: # Arrange bogus_tlog = b"\x00\x00\x00\x00\x00\x00\x00\x00PK\x03\x04bogus" client = TestClient(make_app(fake_runner)) # Act response = client.post( "/replay", files={ "tlog": ("d.tlog", bogus_tlog, "application/octet-stream"), "video": ("d.mp4", _valid_mp4_bytes(), "video/mp4"), "calibration": ("k.json", _valid_calibration_bytes(), "application/json"), }, ) # Assert assert response.status_code == 400 assert response.json()["error_code"] == "unsupported_file_kind" def test_post_replay_rejects_misnamed_zip_as_video( fake_runner: _FakeRunner, make_app: Any ) -> None: # Arrange bogus_video = b"\x00\x00\x00\x20notftyp..." + b"\x00" * 64 client = TestClient(make_app(fake_runner)) # Act response = client.post( "/replay", files={ "tlog": ("d.tlog", _valid_tlog_bytes(), "application/octet-stream"), "video": ("d.mp4", bogus_video, "video/mp4"), "calibration": ("k.json", _valid_calibration_bytes(), "application/json"), }, ) # Assert assert response.status_code == 400 assert response.json()["error_code"] == "unsupported_file_kind" # --------------------------------------------------------------------- # Helpers def _wait_done(client: TestClient, job_id: str, timeout_s: float = 10.0) -> None: """Block until ``job_id`` is in state ``done``.""" deadline = time.monotonic() + timeout_s while time.monotonic() < deadline: snap = client.get(f"/jobs/{job_id}").json() if snap["state"] == JobState.DONE.value: return if snap["state"] == JobState.FAILED.value: raise AssertionError(f"job {job_id} unexpectedly failed: {snap}") time.sleep(0.05) raise AssertionError(f"job {job_id} did not reach DONE within {timeout_s}s") def _wait_terminal( client: TestClient, job_id: str, timeout_s: float = 10.0 ) -> dict[str, Any]: deadline = time.monotonic() + timeout_s while time.monotonic() < deadline: snap = client.get(f"/jobs/{job_id}").json() if snap["state"] in {JobState.DONE.value, JobState.FAILED.value}: return snap time.sleep(0.05) raise AssertionError(f"job {job_id} did not reach terminal state") # Suppress unused-imports warnings for symbols only the test harness uses. _ = (os, threading, fastapi)