Files
gps-denied-onboard/tests/unit/c13_fdr/test_az294_tile_snapshot_sink.py
Oleksandr Bezdieniezhnykh e4ecdaf619 [AZ-294] [AZ-295] [AZ-296] Finish C13: tile snapshot + record-kind policy + takeoff abort
AZ-294: MidFlightTileSnapshotSink writes orthorectified tile JPEGs
atomically to flight_root/<flight_id>/tiles/<tile_id>.jpg, emits a
kind="mid_flight_tile_snapshot" pointer record, and evicts the oldest
tile when the per-flight 64 MiB cap is exceeded. Adds optional
frame_id to the snapshot payload (fdr_record_schema bump).

AZ-295: RecordKindPolicy with two paired gates:
- enforce_or_raise (producer-side) raises RawFrameWriteForbiddenError
  for raw_nav_frame / raw_ai_cam_frame at the call site, defending
  AC-8.5 / RESTRICT-UAV-4.
- gate_for_writer (writer-side) tumbling-window rate-caps
  failed_tile_thumbnail records at <= 0.1 Hz; over-cap drops are
  coalesced into kind="overrun" records with the originating
  producer slug.

AZ-296: take_off() composition-root sequence with strict ordering
(writer.__init__ -> start -> open_flight -> fc_adapter.__init__ ->
fc_adapter.open). On FdrOpenError, logs ERROR record, calls
writer.stop(), prints the documented FATAL line to stderr, and
sys.exit(EXIT_FDR_OPEN_FAILURE=2). composition_root_protocol bumped
to v1.1.0 with the new constants + takeoff-sequence section.

29 new tests; full suite 356 passed / 2 skipped / 0 failures.
No new dependencies (stdlib only).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 03:52:07 +03:00

214 lines
6.7 KiB
Python

"""AZ-294 — MidFlightTileSnapshotSink unit tests."""
from __future__ import annotations
import struct
from datetime import datetime, timedelta, timezone
from pathlib import Path
from uuid import uuid4
import pytest
from gps_denied_onboard.components.c13_fdr import (
MidFlightTileSnapshotSink,
TileSnapshotInvalidIdError,
TileSnapshotTooLargeError,
)
from gps_denied_onboard.config import TileSnapshotConfig
from gps_denied_onboard.fdr_client.client import FdrClient
from gps_denied_onboard.fdr_client.records import OVERRUN_KIND, parse
_LENGTH_PREFIX = struct.Struct("<I")
_JPEG_MAGIC = b"\xff\xd8\xff\xe0"
def _jpeg_blob(size: int = 1024) -> bytes:
return _JPEG_MAGIC + b"\x00" * (size - len(_JPEG_MAGIC))
def _make_sink(
tmp_path: Path,
config: TileSnapshotConfig | None = None,
) -> tuple[MidFlightTileSnapshotSink, FdrClient]:
client = FdrClient(producer_id="shared.tile_snapshot_sink", capacity=256, _emit_diag_log=False)
sink = MidFlightTileSnapshotSink(
flight_root=tmp_path,
flight_id=uuid4(),
fdr_client=client,
config=config or TileSnapshotConfig(),
)
return sink, client
def _drain_kinds(client: FdrClient) -> list[str]:
return [rec.kind for rec in client.drain(max_records=1024)]
def test_ac1_write_snapshot_creates_canonical_jpeg(tmp_path: Path) -> None:
# Arrange
sink, _client = _make_sink(tmp_path)
blob = _jpeg_blob(2048)
# Act
path = sink.write_snapshot(
tile_id="tile_001",
jpeg_bytes=blob,
captured_at=datetime(2026, 5, 11, tzinfo=timezone.utc),
)
# Assert
assert path.exists()
assert path.name == "tile_001.jpg"
assert path.read_bytes() == blob
assert path.parent == sink.tiles_dir
def test_ac2_write_snapshot_emits_pointer_record(tmp_path: Path) -> None:
# Arrange
sink, client = _make_sink(tmp_path)
captured = datetime(2026, 5, 11, 12, 0, 0, tzinfo=timezone.utc)
# Act
sink.write_snapshot("tile_a", _jpeg_blob(), captured)
batch = client.drain(max_records=16)
# Assert
assert len(batch) == 1
rec = batch[0]
assert rec.kind == "mid_flight_tile_snapshot"
assert rec.payload["snapshot_path"] == "tiles/tile_a.jpg"
assert rec.payload["captured_at"] == captured.isoformat()
def test_ac3_oversize_jpeg_rejected(tmp_path: Path) -> None:
# Arrange
config = TileSnapshotConfig(jpeg_max_bytes=256)
sink, client = _make_sink(tmp_path, config)
# Act + Assert
with pytest.raises(TileSnapshotTooLargeError, match=r"jpeg_max_bytes"):
sink.write_snapshot("tile_a", b"\x00" * 257, datetime.now(tz=timezone.utc))
# No file is written; no pointer record enqueued.
assert not sink.tiles_dir.exists() or not any(sink.tiles_dir.iterdir())
assert _drain_kinds(client) == []
def test_ac4_invalid_tile_id_rejected(tmp_path: Path) -> None:
# Arrange
sink, client = _make_sink(tmp_path)
invalid_ids = ["../etc/passwd", "tile with space", "../../e", "a" * 129, ""]
# Act + Assert
for tile_id in invalid_ids:
with pytest.raises(TileSnapshotInvalidIdError):
sink.write_snapshot(tile_id, _jpeg_blob(), datetime.now(tz=timezone.utc))
assert _drain_kinds(client) == []
def test_ac5_atomic_write_temp_file_cleaned(tmp_path: Path) -> None:
# Arrange
sink, _client = _make_sink(tmp_path)
# Act
sink.write_snapshot("tile_b", _jpeg_blob(), datetime.now(tz=timezone.utc))
# Assert — no leftover `.tmp` file in the tiles directory
leftovers = [p for p in sink.tiles_dir.iterdir() if p.name.endswith(".tmp")]
assert leftovers == []
def test_ac6_cap_drop_oldest_when_exceeded(tmp_path: Path) -> None:
# Arrange: cap = 4 KiB; each JPEG = 2 KiB → 3rd write must evict 1st.
config = TileSnapshotConfig(
tile_snapshot_cap_bytes=4 * 1024,
jpeg_max_bytes=3 * 1024,
)
sink, client = _make_sink(tmp_path, config)
blob = _jpeg_blob(2 * 1024)
t0 = datetime(2026, 5, 11, tzinfo=timezone.utc)
# Act
sink.write_snapshot("tile_1", blob, t0)
sink.write_snapshot("tile_2", blob, t0 + timedelta(seconds=1))
sink.write_snapshot("tile_3", blob, t0 + timedelta(seconds=2))
# Assert — tile_1 evicted; tile_2 + tile_3 survive
surviving = sorted(p.name for p in sink.tiles_dir.iterdir())
assert "tile_1.jpg" not in surviving
assert "tile_2.jpg" in surviving
assert "tile_3.jpg" in surviving
kinds = [r.kind for r in client.drain(max_records=64)]
assert kinds.count(OVERRUN_KIND) == 1
assert kinds.count("mid_flight_tile_snapshot") == 3
def test_ac7_thread_safe_concurrent_writes(tmp_path: Path) -> None:
# Arrange
import threading
sink, client = _make_sink(tmp_path)
errors: list[BaseException] = []
def writer(idx: int) -> None:
try:
sink.write_snapshot(
f"tile_{idx:03d}",
_jpeg_blob(1024),
datetime.now(tz=timezone.utc),
)
except BaseException as exc:
errors.append(exc)
# Act
threads = [threading.Thread(target=writer, args=(i,)) for i in range(8)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=2.0)
# Assert — all 8 tiles written; 8 pointer records emitted
assert errors == []
assert sum(1 for _p in sink.tiles_dir.iterdir() if _p.suffix == ".jpg") == 8
kinds = [r.kind for r in client.drain(max_records=64)]
assert kinds.count("mid_flight_tile_snapshot") == 8
def test_ac8_frame_id_optional_in_payload(tmp_path: Path) -> None:
# Arrange
sink, client = _make_sink(tmp_path)
# Act
sink.write_snapshot("tile_c", _jpeg_blob(), datetime.now(tz=timezone.utc), frame_id=42)
batch = client.drain(max_records=16)
assert len(batch) == 1
assert batch[0].payload["frame_id"] == 42
# Act-2: frame_id omitted
sink.write_snapshot("tile_d", _jpeg_blob(), datetime.now(tz=timezone.utc))
batch2 = client.drain(max_records=16)
assert len(batch2) == 1
assert "frame_id" not in batch2[0].payload
def test_ac9_roundtrip_through_parse(tmp_path: Path) -> None:
"""Pointer record survives serialise/parse roundtrip (AZ-272 v1.1)."""
# Arrange
sink, client = _make_sink(tmp_path)
captured = datetime(2026, 5, 11, 9, 0, 0, tzinfo=timezone.utc)
# Act
sink.write_snapshot("tile_r", _jpeg_blob(), captured, frame_id=7)
batch = client.drain(max_records=16)
assert len(batch) == 1
rec = batch[0]
from gps_denied_onboard.fdr_client.records import serialise
roundtrip = parse(serialise(rec))
# Assert
assert roundtrip.kind == "mid_flight_tile_snapshot"
assert roundtrip.payload["snapshot_path"] == "tiles/tile_r.jpg"
assert roundtrip.payload["captured_at"] == captured.isoformat()
assert roundtrip.payload["frame_id"] == 7