"""AZ-294 — MidFlightTileSnapshotSink unit tests.""" from __future__ import annotations import struct from datetime import datetime, timedelta, timezone from pathlib import Path from uuid import uuid4 import pytest from gps_denied_onboard.components.c13_fdr import ( MidFlightTileSnapshotSink, TileSnapshotInvalidIdError, TileSnapshotTooLargeError, ) from gps_denied_onboard.config import TileSnapshotConfig from gps_denied_onboard.fdr_client.client import FdrClient from gps_denied_onboard.fdr_client.records import OVERRUN_KIND, parse _LENGTH_PREFIX = struct.Struct(" bytes: return _JPEG_MAGIC + b"\x00" * (size - len(_JPEG_MAGIC)) def _make_sink( tmp_path: Path, config: TileSnapshotConfig | None = None, ) -> tuple[MidFlightTileSnapshotSink, FdrClient]: client = FdrClient(producer_id="shared.tile_snapshot_sink", capacity=256, _emit_diag_log=False) sink = MidFlightTileSnapshotSink( flight_root=tmp_path, flight_id=uuid4(), fdr_client=client, config=config or TileSnapshotConfig(), ) return sink, client def _drain_kinds(client: FdrClient) -> list[str]: return [rec.kind for rec in client.drain(max_records=1024)] def test_ac1_write_snapshot_creates_canonical_jpeg(tmp_path: Path) -> None: # Arrange sink, _client = _make_sink(tmp_path) blob = _jpeg_blob(2048) # Act path = sink.write_snapshot( tile_id="tile_001", jpeg_bytes=blob, captured_at=datetime(2026, 5, 11, tzinfo=timezone.utc), ) # Assert assert path.exists() assert path.name == "tile_001.jpg" assert path.read_bytes() == blob assert path.parent == sink.tiles_dir def test_ac2_write_snapshot_emits_pointer_record(tmp_path: Path) -> None: # Arrange sink, client = _make_sink(tmp_path) captured = datetime(2026, 5, 11, 12, 0, 0, tzinfo=timezone.utc) # Act sink.write_snapshot("tile_a", _jpeg_blob(), captured) batch = client.drain(max_records=16) # Assert assert len(batch) == 1 rec = batch[0] assert rec.kind == "mid_flight_tile_snapshot" assert rec.payload["snapshot_path"] == "tiles/tile_a.jpg" assert rec.payload["captured_at"] == captured.isoformat() def test_ac3_oversize_jpeg_rejected(tmp_path: Path) -> None: # Arrange config = TileSnapshotConfig(jpeg_max_bytes=256) sink, client = _make_sink(tmp_path, config) # Act + Assert with pytest.raises(TileSnapshotTooLargeError, match=r"jpeg_max_bytes"): sink.write_snapshot("tile_a", b"\x00" * 257, datetime.now(tz=timezone.utc)) # No file is written; no pointer record enqueued. assert not sink.tiles_dir.exists() or not any(sink.tiles_dir.iterdir()) assert _drain_kinds(client) == [] def test_ac4_invalid_tile_id_rejected(tmp_path: Path) -> None: # Arrange sink, client = _make_sink(tmp_path) invalid_ids = ["../etc/passwd", "tile with space", "../../e", "a" * 129, ""] # Act + Assert for tile_id in invalid_ids: with pytest.raises(TileSnapshotInvalidIdError): sink.write_snapshot(tile_id, _jpeg_blob(), datetime.now(tz=timezone.utc)) assert _drain_kinds(client) == [] def test_ac5_atomic_write_temp_file_cleaned(tmp_path: Path) -> None: # Arrange sink, _client = _make_sink(tmp_path) # Act sink.write_snapshot("tile_b", _jpeg_blob(), datetime.now(tz=timezone.utc)) # Assert — no leftover `.tmp` file in the tiles directory leftovers = [p for p in sink.tiles_dir.iterdir() if p.name.endswith(".tmp")] assert leftovers == [] def test_ac6_cap_drop_oldest_when_exceeded(tmp_path: Path) -> None: # Arrange: cap = 4 KiB; each JPEG = 2 KiB → 3rd write must evict 1st. config = TileSnapshotConfig( tile_snapshot_cap_bytes=4 * 1024, jpeg_max_bytes=3 * 1024, ) sink, client = _make_sink(tmp_path, config) blob = _jpeg_blob(2 * 1024) t0 = datetime(2026, 5, 11, tzinfo=timezone.utc) # Act sink.write_snapshot("tile_1", blob, t0) sink.write_snapshot("tile_2", blob, t0 + timedelta(seconds=1)) sink.write_snapshot("tile_3", blob, t0 + timedelta(seconds=2)) # Assert — tile_1 evicted; tile_2 + tile_3 survive surviving = sorted(p.name for p in sink.tiles_dir.iterdir()) assert "tile_1.jpg" not in surviving assert "tile_2.jpg" in surviving assert "tile_3.jpg" in surviving kinds = [r.kind for r in client.drain(max_records=64)] assert kinds.count(OVERRUN_KIND) == 1 assert kinds.count("mid_flight_tile_snapshot") == 3 def test_ac7_thread_safe_concurrent_writes(tmp_path: Path) -> None: # Arrange import threading sink, client = _make_sink(tmp_path) errors: list[BaseException] = [] def writer(idx: int) -> None: try: sink.write_snapshot( f"tile_{idx:03d}", _jpeg_blob(1024), datetime.now(tz=timezone.utc), ) except BaseException as exc: errors.append(exc) # Act threads = [threading.Thread(target=writer, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join(timeout=2.0) # Assert — all 8 tiles written; 8 pointer records emitted assert errors == [] assert sum(1 for _p in sink.tiles_dir.iterdir() if _p.suffix == ".jpg") == 8 kinds = [r.kind for r in client.drain(max_records=64)] assert kinds.count("mid_flight_tile_snapshot") == 8 def test_ac8_frame_id_optional_in_payload(tmp_path: Path) -> None: # Arrange sink, client = _make_sink(tmp_path) # Act sink.write_snapshot("tile_c", _jpeg_blob(), datetime.now(tz=timezone.utc), frame_id=42) batch = client.drain(max_records=16) assert len(batch) == 1 assert batch[0].payload["frame_id"] == 42 # Act-2: frame_id omitted sink.write_snapshot("tile_d", _jpeg_blob(), datetime.now(tz=timezone.utc)) batch2 = client.drain(max_records=16) assert len(batch2) == 1 assert "frame_id" not in batch2[0].payload def test_ac9_roundtrip_through_parse(tmp_path: Path) -> None: """Pointer record survives serialise/parse roundtrip (AZ-272 v1.1).""" # Arrange sink, client = _make_sink(tmp_path) captured = datetime(2026, 5, 11, 9, 0, 0, tzinfo=timezone.utc) # Act sink.write_snapshot("tile_r", _jpeg_blob(), captured, frame_id=7) batch = client.drain(max_records=16) assert len(batch) == 1 rec = batch[0] from gps_denied_onboard.fdr_client.records import serialise roundtrip = parse(serialise(rec)) # Assert assert roundtrip.kind == "mid_flight_tile_snapshot" assert roundtrip.payload["snapshot_path"] == "tiles/tile_r.jpg" assert roundtrip.payload["captured_at"] == captured.isoformat() assert roundtrip.payload["frame_id"] == 7