Files
Oleksandr Bezdieniezhnykh 7d1288e4ba [AZ-421] Batch 82: FT-P-15 + FT-P-16 + FT-P-18 cache / offline / no-raw-retention
FT-P-15: parse FDR `cache-self-check` records; assert every tile-manifest
entry has CRS, tile_matrix, dimension, m_per_px, capture_date, source,
compression; m_per_px >= 0.5 (or rejected by FDR `tile-load-rejected`).

FT-P-16: read `docker network inspect e2e-net` + `docker inspect <sut>`
snapshots; assert `Internal == true` AND SUT attached only to e2e-net.
The 0-egress semantic of AC-8.3 is enforced structurally.

FT-P-18: walk FDR + tile-cache, probe JPEG dimensions via stdlib SOF
parser, reject any file matching nav-camera raw pattern (5472x3648 or
880x720). Extrapolate thumbnail-log size to 8h; assert < 1 GB.

Adds runner.helpers.tile_cache_inspector with five evaluators
(manifest schema, offline mode, raw-frame detection, thumbnail budget,
JPEG dimension probe) + walk_files helper. Pure-logic coverage: 43
new unit tests; full e2e/_unit_tests/ suite 793 passing (was 746).
Scenarios skip locally when SITL replay fixture or docker-inspect
env vars are missing; production hooks (cache-self-check FDR record,
tile-load-rejected events, docker-inspect snapshots) are tracked
outside this task.

See _docs/03_implementation/batch_82_report.md +
reviews/batch_82_review.md.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 15:09:58 +03:00

428 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Tile-cache + storage compliance evaluators (AZ-421 / FT-P-15/16/18).
Pure-logic evaluators sourced from:
* **FDR archive** — the SUT's startup ``cache-self-check`` record carries
the tile manifest entries the freshness/source/CRS contract has to
hold over (FT-P-15 / AC-8.1, AC-NEW-2).
* **Docker network + container inspect JSON** — verifies the SUT
container is attached only to the ``e2e-net`` network and the
network is configured with ``Internal: true`` (FT-P-16 / AC-8.3,
RESTRICT-SAT-1).
* **Filesystem walks** of ``${FDR_OUTPUT}`` and ``/var/azaion/tile-cache``
— verifies the SUT does NOT retain raw nav-camera / AI-camera
frames (FT-P-18 / AC-8.5).
The shared shape across all three sub-scenarios is the
``X...Report(passes: bool)`` dataclass — a scenario test that wants to
assert all three pulls the report objects and asserts ``passes``.
Public-boundary discipline: this module imports nothing from
``src/gps_denied_onboard``. Inputs are filesystem paths, parsed FDR
records, and dicts decoded from ``docker network inspect`` /
``docker inspect`` JSON.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Sequence
# ─────────────────────────── FT-P-15 / AC-8.1 ───────────────────────────
MANIFEST_M_PER_PX_FLOOR = 0.5
MANIFEST_REQUIRED_FIELDS: tuple[str, ...] = (
"crs",
"tile_matrix",
"dimension",
"m_per_px",
"capture_date",
"source",
"compression",
)
CACHE_SELF_CHECK_FDR_KIND = "cache-self-check"
TILE_LOAD_REJECTED_FDR_KIND = "tile-load-rejected"
@dataclass(frozen=True)
class ManifestEntryReport:
"""Per-entry result of the manifest schema + resolution-floor checks."""
entry_id: str
missing_fields: tuple[str, ...]
m_per_px: float | None
@property
def has_all_fields(self) -> bool:
return not self.missing_fields
@property
def passes_floor(self) -> bool:
return self.m_per_px is not None and self.m_per_px >= MANIFEST_M_PER_PX_FLOOR
@property
def passes(self) -> bool:
return self.has_all_fields and self.passes_floor
@dataclass(frozen=True)
class ManifestSchemaReport:
"""AC-1 + AC-2 of FT-P-15: schema completeness + resolution floor."""
entries: tuple[ManifestEntryReport, ...]
rejected_below_floor_ids: tuple[str, ...]
m_per_px_floor: float = MANIFEST_M_PER_PX_FLOOR
@property
def total_entries(self) -> int:
return len(self.entries)
@property
def entries_with_missing_fields(self) -> tuple[ManifestEntryReport, ...]:
return tuple(e for e in self.entries if not e.has_all_fields)
@property
def entries_below_floor(self) -> tuple[ManifestEntryReport, ...]:
return tuple(e for e in self.entries if e.m_per_px is not None and not e.passes_floor)
@property
def passes(self) -> bool:
if not self.entries:
return False
if self.entries_with_missing_fields:
return False
for entry in self.entries:
if entry.m_per_px is None:
return False
if entry.passes_floor:
continue
# below floor — must be rejected at load
if entry.entry_id not in self.rejected_below_floor_ids:
return False
return True
def evaluate_manifest_schema(
manifest_entries: Sequence[dict],
*,
tile_load_rejected_ids: Sequence[str] = (),
m_per_px_floor: float = MANIFEST_M_PER_PX_FLOOR,
required_fields: Sequence[str] = MANIFEST_REQUIRED_FIELDS,
) -> ManifestSchemaReport:
"""Evaluate AC-1 + AC-2 of FT-P-15 against parsed manifest entries.
Each ``manifest_entries`` element is the ``payload.entries[i]`` dict
extracted from an FDR ``cache-self-check`` record. ``entry_id`` is
looked up under ``"id"`` then ``"tile_id"`` then synthesised from
the entry's index — scenarios should prefer ``"id"`` if their
schema names it differently and adjust upstream.
``tile_load_rejected_ids`` is the set of tile IDs the SUT has
rejected at load time via FDR ``tile-load-rejected`` events; an
entry with ``m_per_px < floor`` only passes if its ID appears in
this set.
"""
if m_per_px_floor <= 0:
raise ValueError(f"m_per_px_floor must be > 0, got {m_per_px_floor}")
rejected = tuple(tile_load_rejected_ids)
entries: list[ManifestEntryReport] = []
for idx, entry in enumerate(manifest_entries):
entry_id = _resolve_entry_id(entry, idx)
missing = tuple(f for f in required_fields if f not in entry)
raw_m_per_px = entry.get("m_per_px")
m_per_px: float | None
if isinstance(raw_m_per_px, (int, float)):
m_per_px = float(raw_m_per_px)
else:
m_per_px = None
entries.append(
ManifestEntryReport(
entry_id=entry_id,
missing_fields=missing,
m_per_px=m_per_px,
)
)
return ManifestSchemaReport(
entries=tuple(entries),
rejected_below_floor_ids=rejected,
m_per_px_floor=m_per_px_floor,
)
def _resolve_entry_id(entry: dict, idx: int) -> str:
for key in ("id", "tile_id", "tile_matrix"):
if key in entry and isinstance(entry[key], str) and entry[key]:
return entry[key]
return f"entry_{idx}"
# ─────────────────────────── FT-P-16 / AC-8.3 ───────────────────────────
E2E_NETWORK_NAME = "e2e-net"
@dataclass(frozen=True)
class OfflineModeReport:
"""AC-3 of FT-P-16: SUT container is on `e2e-net` only and the net is internal."""
network_name: str
network_internal: bool | None
container_networks: tuple[str, ...]
expected_network: str = E2E_NETWORK_NAME
@property
def container_has_only_expected_network(self) -> bool:
return self.container_networks == (self.expected_network,)
@property
def passes(self) -> bool:
if self.network_internal is not True:
return False
return self.container_has_only_expected_network
def evaluate_offline_mode(
network_inspect: dict,
container_inspect: dict,
*,
expected_network: str = E2E_NETWORK_NAME,
) -> OfflineModeReport:
"""Evaluate AC-3 of FT-P-16 from ``docker network inspect`` + ``docker inspect``.
``network_inspect`` is a single network object (the JSON shape
``docker network inspect <name>`` returns inside a list — the
scenario unwraps the list). Required key: ``Internal: bool``.
``container_inspect`` is a single container object. Required key
path: ``NetworkSettings.Networks`` (a dict whose keys are network
names the container is attached to).
"""
network_internal = network_inspect.get("Internal")
if not isinstance(network_internal, bool):
network_internal = None
nets_map = (
container_inspect.get("NetworkSettings", {}).get("Networks", {})
if isinstance(container_inspect.get("NetworkSettings"), dict)
else {}
)
container_networks: tuple[str, ...] = (
tuple(sorted(nets_map.keys())) if isinstance(nets_map, dict) else ()
)
return OfflineModeReport(
network_name=str(network_inspect.get("Name", "")),
network_internal=network_internal,
container_networks=container_networks,
expected_network=expected_network,
)
# ─────────────────────────── FT-P-18 / AC-8.5 ───────────────────────────
NAV_CAMERA_RAW_DIMENSIONS = (5472, 3648)
NAV_CAMERA_DECODED_DIMENSIONS = (880, 720)
RAW_FRAME_EXTENSIONS = (".jpg", ".jpeg", ".raw", ".dng", ".cr2", ".nef", ".arw", ".bin")
THUMBNAIL_LOG_EXTENSIONS = (".log", ".jsonl", ".txt")
THUMBNAIL_LOG_MAX_SIZE_GB_PER_8H = 1.0
THUMBNAIL_LOG_MAX_SIZE_BYTES_PER_8H = int(THUMBNAIL_LOG_MAX_SIZE_GB_PER_8H * 1024**3)
@dataclass(frozen=True)
class RawFrameCandidate:
"""One filesystem entry that matched the raw-frame heuristic."""
path: Path
size_bytes: int
dimensions: tuple[int, int] | None
reason: str
@dataclass(frozen=True)
class RawFrameDetectionReport:
"""AC-4 of FT-P-18: zero raw-frame retention."""
candidates: tuple[RawFrameCandidate, ...]
nav_camera_raw_dimensions: tuple[int, int] = NAV_CAMERA_RAW_DIMENSIONS
nav_camera_decoded_dimensions: tuple[int, int] = NAV_CAMERA_DECODED_DIMENSIONS
@property
def candidate_count(self) -> int:
return len(self.candidates)
@property
def passes(self) -> bool:
return self.candidate_count == 0
def detect_raw_frames(
file_specs: Iterable[tuple[Path, int, tuple[int, int] | None]],
*,
raw_dimensions: tuple[int, int] = NAV_CAMERA_RAW_DIMENSIONS,
decoded_dimensions: tuple[int, int] = NAV_CAMERA_DECODED_DIMENSIONS,
raw_extensions: Sequence[str] = RAW_FRAME_EXTENSIONS,
) -> RawFrameDetectionReport:
"""AC-4: detect any file whose extension + dimensions match raw nav frames.
``file_specs`` is an iterable of ``(path, size_bytes, dimensions)``
triples. The scenario test produces this by walking the filesystem
and probing each image file's dimensions; this evaluator only
decides *which* of those triples count as raw frames.
A file matches when:
1. Extension is in ``raw_extensions``, AND
2. ``dimensions`` equals either the raw nav-cam dims (5472×3648,
order-insensitive) OR the H.264-decoded dims (880×720,
order-insensitive).
A file with a raw extension but unknown dimensions does NOT match
(the scenario is expected to fail dimension probe loudly, not be
silently absorbed by the evaluator).
"""
targets = {tuple(sorted(raw_dimensions)), tuple(sorted(decoded_dimensions))}
raw_ext_lower = tuple(ext.lower() for ext in raw_extensions)
candidates: list[RawFrameCandidate] = []
for path, size_bytes, dims in file_specs:
if path.suffix.lower() not in raw_ext_lower:
continue
if dims is None:
continue
if tuple(sorted(dims)) not in targets:
continue
candidates.append(
RawFrameCandidate(
path=path,
size_bytes=size_bytes,
dimensions=dims,
reason=(
f"extension {path.suffix} + dimensions {dims} match nav-camera raw pattern"
),
)
)
return RawFrameDetectionReport(
candidates=tuple(candidates),
nav_camera_raw_dimensions=raw_dimensions,
nav_camera_decoded_dimensions=decoded_dimensions,
)
@dataclass(frozen=True)
class ThumbnailLogBudgetReport:
"""AC-5 of FT-P-18: thumbnail log size budget under 1 GB / 8 h."""
observed_size_bytes: int
observed_duration_h: float
extrapolated_8h_size_bytes: int
max_size_bytes_per_8h: int = THUMBNAIL_LOG_MAX_SIZE_BYTES_PER_8H
@property
def passes(self) -> bool:
if self.observed_duration_h <= 0:
return False
return self.extrapolated_8h_size_bytes < self.max_size_bytes_per_8h
def evaluate_thumbnail_budget(
observed_size_bytes: int,
observed_duration_h: float,
*,
max_size_bytes_per_8h: int = THUMBNAIL_LOG_MAX_SIZE_BYTES_PER_8H,
) -> ThumbnailLogBudgetReport:
"""AC-5: extrapolate observed thumbnail log size to an 8h flight.
``observed_size_bytes`` is the sum of every thumbnail-log file
under the FDR + cache walk (extensions in
``THUMBNAIL_LOG_EXTENSIONS``). ``observed_duration_h`` is the
wall-clock duration of the replay segment that produced them.
Extrapolation is linear: ``size * (8 / duration_h)``.
Returns a report whose ``passes`` flag holds when
``extrapolated_8h_size_bytes < max_size_bytes_per_8h``.
"""
if observed_size_bytes < 0:
raise ValueError(f"observed_size_bytes must be ≥0, got {observed_size_bytes}")
if max_size_bytes_per_8h <= 0:
raise ValueError(
f"max_size_bytes_per_8h must be >0, got {max_size_bytes_per_8h}"
)
if observed_duration_h <= 0:
extrapolated = -1
else:
extrapolated = int(observed_size_bytes * (8.0 / observed_duration_h))
return ThumbnailLogBudgetReport(
observed_size_bytes=observed_size_bytes,
observed_duration_h=observed_duration_h,
extrapolated_8h_size_bytes=extrapolated,
max_size_bytes_per_8h=max_size_bytes_per_8h,
)
# ─────────────────────── Filesystem walk helpers ───────────────────────
def walk_files(*roots: Path) -> Iterable[Path]:
"""Recursive file iterator over every existing root.
Convenience for the FT-P-18 scenario: stitch together
``fdr_archive_root`` + ``tile_cache_root`` walks under one call.
Non-existent roots are silently skipped (the FDR archive may be
absent on a skip-gated local run — the scenario explicitly checks
that elsewhere).
"""
for root in roots:
if not root.exists():
continue
for p in root.rglob("*"):
if p.is_file():
yield p
def probe_jpeg_dimensions(path: Path) -> tuple[int, int] | None:
"""Return ``(width, height)`` of a JPEG by parsing its SOF marker.
Pure-stdlib JPEG SOF0/SOF1/SOF2 parser — avoids loading the full
image (so a directory walk over hundreds of files is cheap) and
avoids a runtime dep on Pillow/OpenCV here (both are available in
the runner but adding them as a hard import would couple the
evaluator to those packages for what is fundamentally a 32-byte
header read).
Returns ``None`` if the file is not a JPEG, the SOF marker is not
present, or the file is truncated.
"""
try:
with path.open("rb") as fh:
head = fh.read(2)
if head != b"\xff\xd8":
return None
while True:
marker_prefix = fh.read(1)
if not marker_prefix:
return None
if marker_prefix != b"\xff":
return None
marker = fh.read(1)
if not marker:
return None
# SOF markers: 0xC0-0xCF except 0xC4 (DHT), 0xC8 (JPG), 0xCC (DAC)
if marker[0] in (0xC0, 0xC1, 0xC2, 0xC3, 0xC5, 0xC6, 0xC7, 0xC9, 0xCA, 0xCB, 0xCD, 0xCE, 0xCF):
fh.read(3) # segment length (2) + precision (1)
h_bytes = fh.read(2)
w_bytes = fh.read(2)
if len(h_bytes) != 2 or len(w_bytes) != 2:
return None
height = int.from_bytes(h_bytes, "big")
width = int.from_bytes(w_bytes, "big")
return (width, height)
seg_len_bytes = fh.read(2)
if len(seg_len_bytes) != 2:
return None
seg_len = int.from_bytes(seg_len_bytes, "big")
if seg_len < 2:
return None
fh.seek(seg_len - 2, 1)
except OSError:
return None