[AZ-421] Batch 82: FT-P-15 + FT-P-16 + FT-P-18 cache / offline / no-raw-retention

FT-P-15: parse FDR `cache-self-check` records; assert every tile-manifest
entry has CRS, tile_matrix, dimension, m_per_px, capture_date, source,
compression; m_per_px >= 0.5 (or rejected by FDR `tile-load-rejected`).

FT-P-16: read `docker network inspect e2e-net` + `docker inspect <sut>`
snapshots; assert `Internal == true` AND SUT attached only to e2e-net.
The 0-egress semantic of AC-8.3 is enforced structurally.

FT-P-18: walk FDR + tile-cache, probe JPEG dimensions via stdlib SOF
parser, reject any file matching nav-camera raw pattern (5472x3648 or
880x720). Extrapolate thumbnail-log size to 8h; assert < 1 GB.

Adds runner.helpers.tile_cache_inspector with five evaluators
(manifest schema, offline mode, raw-frame detection, thumbnail budget,
JPEG dimension probe) + walk_files helper. Pure-logic coverage: 43
new unit tests; full e2e/_unit_tests/ suite 793 passing (was 746).
Scenarios skip locally when SITL replay fixture or docker-inspect
env vars are missing; production hooks (cache-self-check FDR record,
tile-load-rejected events, docker-inspect snapshots) are tracked
outside this task.

See _docs/03_implementation/batch_82_report.md +
reviews/batch_82_review.md.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-17 15:09:58 +03:00
parent b0296da911
commit 7d1288e4ba
9 changed files with 1693 additions and 3 deletions
+427
View File
@@ -0,0 +1,427 @@
"""Tile-cache + storage compliance evaluators (AZ-421 / FT-P-15/16/18).
Pure-logic evaluators sourced from:
* **FDR archive** — the SUT's startup ``cache-self-check`` record carries
the tile manifest entries the freshness/source/CRS contract has to
hold over (FT-P-15 / AC-8.1, AC-NEW-2).
* **Docker network + container inspect JSON** — verifies the SUT
container is attached only to the ``e2e-net`` network and the
network is configured with ``Internal: true`` (FT-P-16 / AC-8.3,
RESTRICT-SAT-1).
* **Filesystem walks** of ``${FDR_OUTPUT}`` and ``/var/azaion/tile-cache``
— verifies the SUT does NOT retain raw nav-camera / AI-camera
frames (FT-P-18 / AC-8.5).
The shared shape across all three sub-scenarios is the
``X...Report(passes: bool)`` dataclass — a scenario test that wants to
assert all three pulls the report objects and asserts ``passes``.
Public-boundary discipline: this module imports nothing from
``src/gps_denied_onboard``. Inputs are filesystem paths, parsed FDR
records, and dicts decoded from ``docker network inspect`` /
``docker inspect`` JSON.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Sequence
# ─────────────────────────── FT-P-15 / AC-8.1 ───────────────────────────
MANIFEST_M_PER_PX_FLOOR = 0.5
MANIFEST_REQUIRED_FIELDS: tuple[str, ...] = (
"crs",
"tile_matrix",
"dimension",
"m_per_px",
"capture_date",
"source",
"compression",
)
CACHE_SELF_CHECK_FDR_KIND = "cache-self-check"
TILE_LOAD_REJECTED_FDR_KIND = "tile-load-rejected"
@dataclass(frozen=True)
class ManifestEntryReport:
"""Per-entry result of the manifest schema + resolution-floor checks."""
entry_id: str
missing_fields: tuple[str, ...]
m_per_px: float | None
@property
def has_all_fields(self) -> bool:
return not self.missing_fields
@property
def passes_floor(self) -> bool:
return self.m_per_px is not None and self.m_per_px >= MANIFEST_M_PER_PX_FLOOR
@property
def passes(self) -> bool:
return self.has_all_fields and self.passes_floor
@dataclass(frozen=True)
class ManifestSchemaReport:
"""AC-1 + AC-2 of FT-P-15: schema completeness + resolution floor."""
entries: tuple[ManifestEntryReport, ...]
rejected_below_floor_ids: tuple[str, ...]
m_per_px_floor: float = MANIFEST_M_PER_PX_FLOOR
@property
def total_entries(self) -> int:
return len(self.entries)
@property
def entries_with_missing_fields(self) -> tuple[ManifestEntryReport, ...]:
return tuple(e for e in self.entries if not e.has_all_fields)
@property
def entries_below_floor(self) -> tuple[ManifestEntryReport, ...]:
return tuple(e for e in self.entries if e.m_per_px is not None and not e.passes_floor)
@property
def passes(self) -> bool:
if not self.entries:
return False
if self.entries_with_missing_fields:
return False
for entry in self.entries:
if entry.m_per_px is None:
return False
if entry.passes_floor:
continue
# below floor — must be rejected at load
if entry.entry_id not in self.rejected_below_floor_ids:
return False
return True
def evaluate_manifest_schema(
manifest_entries: Sequence[dict],
*,
tile_load_rejected_ids: Sequence[str] = (),
m_per_px_floor: float = MANIFEST_M_PER_PX_FLOOR,
required_fields: Sequence[str] = MANIFEST_REQUIRED_FIELDS,
) -> ManifestSchemaReport:
"""Evaluate AC-1 + AC-2 of FT-P-15 against parsed manifest entries.
Each ``manifest_entries`` element is the ``payload.entries[i]`` dict
extracted from an FDR ``cache-self-check`` record. ``entry_id`` is
looked up under ``"id"`` then ``"tile_id"`` then synthesised from
the entry's index — scenarios should prefer ``"id"`` if their
schema names it differently and adjust upstream.
``tile_load_rejected_ids`` is the set of tile IDs the SUT has
rejected at load time via FDR ``tile-load-rejected`` events; an
entry with ``m_per_px < floor`` only passes if its ID appears in
this set.
"""
if m_per_px_floor <= 0:
raise ValueError(f"m_per_px_floor must be > 0, got {m_per_px_floor}")
rejected = tuple(tile_load_rejected_ids)
entries: list[ManifestEntryReport] = []
for idx, entry in enumerate(manifest_entries):
entry_id = _resolve_entry_id(entry, idx)
missing = tuple(f for f in required_fields if f not in entry)
raw_m_per_px = entry.get("m_per_px")
m_per_px: float | None
if isinstance(raw_m_per_px, (int, float)):
m_per_px = float(raw_m_per_px)
else:
m_per_px = None
entries.append(
ManifestEntryReport(
entry_id=entry_id,
missing_fields=missing,
m_per_px=m_per_px,
)
)
return ManifestSchemaReport(
entries=tuple(entries),
rejected_below_floor_ids=rejected,
m_per_px_floor=m_per_px_floor,
)
def _resolve_entry_id(entry: dict, idx: int) -> str:
for key in ("id", "tile_id", "tile_matrix"):
if key in entry and isinstance(entry[key], str) and entry[key]:
return entry[key]
return f"entry_{idx}"
# ─────────────────────────── FT-P-16 / AC-8.3 ───────────────────────────
E2E_NETWORK_NAME = "e2e-net"
@dataclass(frozen=True)
class OfflineModeReport:
"""AC-3 of FT-P-16: SUT container is on `e2e-net` only and the net is internal."""
network_name: str
network_internal: bool | None
container_networks: tuple[str, ...]
expected_network: str = E2E_NETWORK_NAME
@property
def container_has_only_expected_network(self) -> bool:
return self.container_networks == (self.expected_network,)
@property
def passes(self) -> bool:
if self.network_internal is not True:
return False
return self.container_has_only_expected_network
def evaluate_offline_mode(
network_inspect: dict,
container_inspect: dict,
*,
expected_network: str = E2E_NETWORK_NAME,
) -> OfflineModeReport:
"""Evaluate AC-3 of FT-P-16 from ``docker network inspect`` + ``docker inspect``.
``network_inspect`` is a single network object (the JSON shape
``docker network inspect <name>`` returns inside a list — the
scenario unwraps the list). Required key: ``Internal: bool``.
``container_inspect`` is a single container object. Required key
path: ``NetworkSettings.Networks`` (a dict whose keys are network
names the container is attached to).
"""
network_internal = network_inspect.get("Internal")
if not isinstance(network_internal, bool):
network_internal = None
nets_map = (
container_inspect.get("NetworkSettings", {}).get("Networks", {})
if isinstance(container_inspect.get("NetworkSettings"), dict)
else {}
)
container_networks: tuple[str, ...] = (
tuple(sorted(nets_map.keys())) if isinstance(nets_map, dict) else ()
)
return OfflineModeReport(
network_name=str(network_inspect.get("Name", "")),
network_internal=network_internal,
container_networks=container_networks,
expected_network=expected_network,
)
# ─────────────────────────── FT-P-18 / AC-8.5 ───────────────────────────
NAV_CAMERA_RAW_DIMENSIONS = (5472, 3648)
NAV_CAMERA_DECODED_DIMENSIONS = (880, 720)
RAW_FRAME_EXTENSIONS = (".jpg", ".jpeg", ".raw", ".dng", ".cr2", ".nef", ".arw", ".bin")
THUMBNAIL_LOG_EXTENSIONS = (".log", ".jsonl", ".txt")
THUMBNAIL_LOG_MAX_SIZE_GB_PER_8H = 1.0
THUMBNAIL_LOG_MAX_SIZE_BYTES_PER_8H = int(THUMBNAIL_LOG_MAX_SIZE_GB_PER_8H * 1024**3)
@dataclass(frozen=True)
class RawFrameCandidate:
"""One filesystem entry that matched the raw-frame heuristic."""
path: Path
size_bytes: int
dimensions: tuple[int, int] | None
reason: str
@dataclass(frozen=True)
class RawFrameDetectionReport:
"""AC-4 of FT-P-18: zero raw-frame retention."""
candidates: tuple[RawFrameCandidate, ...]
nav_camera_raw_dimensions: tuple[int, int] = NAV_CAMERA_RAW_DIMENSIONS
nav_camera_decoded_dimensions: tuple[int, int] = NAV_CAMERA_DECODED_DIMENSIONS
@property
def candidate_count(self) -> int:
return len(self.candidates)
@property
def passes(self) -> bool:
return self.candidate_count == 0
def detect_raw_frames(
file_specs: Iterable[tuple[Path, int, tuple[int, int] | None]],
*,
raw_dimensions: tuple[int, int] = NAV_CAMERA_RAW_DIMENSIONS,
decoded_dimensions: tuple[int, int] = NAV_CAMERA_DECODED_DIMENSIONS,
raw_extensions: Sequence[str] = RAW_FRAME_EXTENSIONS,
) -> RawFrameDetectionReport:
"""AC-4: detect any file whose extension + dimensions match raw nav frames.
``file_specs`` is an iterable of ``(path, size_bytes, dimensions)``
triples. The scenario test produces this by walking the filesystem
and probing each image file's dimensions; this evaluator only
decides *which* of those triples count as raw frames.
A file matches when:
1. Extension is in ``raw_extensions``, AND
2. ``dimensions`` equals either the raw nav-cam dims (5472×3648,
order-insensitive) OR the H.264-decoded dims (880×720,
order-insensitive).
A file with a raw extension but unknown dimensions does NOT match
(the scenario is expected to fail dimension probe loudly, not be
silently absorbed by the evaluator).
"""
targets = {tuple(sorted(raw_dimensions)), tuple(sorted(decoded_dimensions))}
raw_ext_lower = tuple(ext.lower() for ext in raw_extensions)
candidates: list[RawFrameCandidate] = []
for path, size_bytes, dims in file_specs:
if path.suffix.lower() not in raw_ext_lower:
continue
if dims is None:
continue
if tuple(sorted(dims)) not in targets:
continue
candidates.append(
RawFrameCandidate(
path=path,
size_bytes=size_bytes,
dimensions=dims,
reason=(
f"extension {path.suffix} + dimensions {dims} match nav-camera raw pattern"
),
)
)
return RawFrameDetectionReport(
candidates=tuple(candidates),
nav_camera_raw_dimensions=raw_dimensions,
nav_camera_decoded_dimensions=decoded_dimensions,
)
@dataclass(frozen=True)
class ThumbnailLogBudgetReport:
"""AC-5 of FT-P-18: thumbnail log size budget under 1 GB / 8 h."""
observed_size_bytes: int
observed_duration_h: float
extrapolated_8h_size_bytes: int
max_size_bytes_per_8h: int = THUMBNAIL_LOG_MAX_SIZE_BYTES_PER_8H
@property
def passes(self) -> bool:
if self.observed_duration_h <= 0:
return False
return self.extrapolated_8h_size_bytes < self.max_size_bytes_per_8h
def evaluate_thumbnail_budget(
observed_size_bytes: int,
observed_duration_h: float,
*,
max_size_bytes_per_8h: int = THUMBNAIL_LOG_MAX_SIZE_BYTES_PER_8H,
) -> ThumbnailLogBudgetReport:
"""AC-5: extrapolate observed thumbnail log size to an 8h flight.
``observed_size_bytes`` is the sum of every thumbnail-log file
under the FDR + cache walk (extensions in
``THUMBNAIL_LOG_EXTENSIONS``). ``observed_duration_h`` is the
wall-clock duration of the replay segment that produced them.
Extrapolation is linear: ``size * (8 / duration_h)``.
Returns a report whose ``passes`` flag holds when
``extrapolated_8h_size_bytes < max_size_bytes_per_8h``.
"""
if observed_size_bytes < 0:
raise ValueError(f"observed_size_bytes must be ≥0, got {observed_size_bytes}")
if max_size_bytes_per_8h <= 0:
raise ValueError(
f"max_size_bytes_per_8h must be >0, got {max_size_bytes_per_8h}"
)
if observed_duration_h <= 0:
extrapolated = -1
else:
extrapolated = int(observed_size_bytes * (8.0 / observed_duration_h))
return ThumbnailLogBudgetReport(
observed_size_bytes=observed_size_bytes,
observed_duration_h=observed_duration_h,
extrapolated_8h_size_bytes=extrapolated,
max_size_bytes_per_8h=max_size_bytes_per_8h,
)
# ─────────────────────── Filesystem walk helpers ───────────────────────
def walk_files(*roots: Path) -> Iterable[Path]:
"""Recursive file iterator over every existing root.
Convenience for the FT-P-18 scenario: stitch together
``fdr_archive_root`` + ``tile_cache_root`` walks under one call.
Non-existent roots are silently skipped (the FDR archive may be
absent on a skip-gated local run — the scenario explicitly checks
that elsewhere).
"""
for root in roots:
if not root.exists():
continue
for p in root.rglob("*"):
if p.is_file():
yield p
def probe_jpeg_dimensions(path: Path) -> tuple[int, int] | None:
"""Return ``(width, height)`` of a JPEG by parsing its SOF marker.
Pure-stdlib JPEG SOF0/SOF1/SOF2 parser — avoids loading the full
image (so a directory walk over hundreds of files is cheap) and
avoids a runtime dep on Pillow/OpenCV here (both are available in
the runner but adding them as a hard import would couple the
evaluator to those packages for what is fundamentally a 32-byte
header read).
Returns ``None`` if the file is not a JPEG, the SOF marker is not
present, or the file is truncated.
"""
try:
with path.open("rb") as fh:
head = fh.read(2)
if head != b"\xff\xd8":
return None
while True:
marker_prefix = fh.read(1)
if not marker_prefix:
return None
if marker_prefix != b"\xff":
return None
marker = fh.read(1)
if not marker:
return None
# SOF markers: 0xC0-0xCF except 0xC4 (DHT), 0xC8 (JPG), 0xCC (DAC)
if marker[0] in (0xC0, 0xC1, 0xC2, 0xC3, 0xC5, 0xC6, 0xC7, 0xC9, 0xCA, 0xCB, 0xCD, 0xCE, 0xCF):
fh.read(3) # segment length (2) + precision (1)
h_bytes = fh.read(2)
w_bytes = fh.read(2)
if len(h_bytes) != 2 or len(w_bytes) != 2:
return None
height = int.from_bytes(h_bytes, "big")
width = int.from_bytes(w_bytes, "big")
return (width, height)
seg_len_bytes = fh.read(2)
if len(seg_len_bytes) != 2:
return None
seg_len = int.from_bytes(seg_len_bytes, "big")
if seg_len < 2:
return None
fh.seek(seg_len - 2, 1)
except OSError:
return None