"""Aged-tile rejection evaluator (AZ-427 / FT-N-05). Pure-logic helpers for AC-8.2 / AC-NEW-6 negative scenario: when the SUT mounts a tile cache whose manifest dates exceed the freshness window for the configured sector, NO outbound emission may carry ``source_label = satellite_anchored``. The AC accepts two distinct evidence shapes (per the task spec): * **Signal A — load-time rejection**: an FDR ``tile-load-rejected`` record with ``reason == "stale"`` was emitted at startup for the stale tile(s). Conceptually the SUT refused to load aged tiles at all, so no later emission could be anchored on them. * **Signal B — per-frame downgrade**: the SUT loaded the tiles but the freshness gate downgrades every outbound emission to ``source_label ∈ {visual_propagated, dead_reckoned}``. A run passes iff at least one signal holds AND no emission slipped through with ``satellite_anchored``. A run fails the moment ANY frame carries ``satellite_anchored`` regardless of which signal otherwise held. The scenario test pulls emissions from the SITL observer / FDR stream and feeds a typed list to ``evaluate_aged_tile_rejection``; this module decides whether the parsed inputs satisfy the AC. Public-boundary discipline: NO imports from ``src/gps_denied_onboard``. """ from __future__ import annotations from dataclasses import dataclass from typing import Iterable, Sequence from .estimate_schema import ALLOWED_SOURCE_LABELS # ─────────────────────── FDR record kinds & constants ─────────────────────── TILE_LOAD_REJECTED_FDR_KIND = "tile-load-rejected" TILE_LOAD_REJECTED_STALE_REASON = "stale" SATELLITE_ANCHORED_LABEL = "satellite_anchored" ACCEPTABLE_DOWNGRADE_LABELS: frozenset[str] = frozenset( {"visual_propagated", "dead_reckoned"} ) # Sector configuration values per AC-8.2 — surface them here so the # scenario test can declare its sub-case configuration in the same # vocabulary as the AC. SECTOR_ACTIVE_CONFLICT = "active_conflict" SECTOR_REAR = "rear" # Sub-case bindings: synth-age fixture name → sector value the SUT must # be configured for, per AC-8.2 of the AZ-427 spec. Used by the scenario # test to declare its parameterisation in spec vocabulary. AGED_FIXTURE_SECTOR_BINDINGS: tuple[tuple[str, str], ...] = ( ("synth-age-7mo", SECTOR_ACTIVE_CONFLICT), ("synth-age-13mo", SECTOR_REAR), ) @dataclass(frozen=True) class SourceLabelEmission: """One outbound estimate's source-label observation. ``frame_id`` is whatever identifier the scenario test attaches to the per-frame emission (e.g., still-image filename or per-frame monotonic counter). It is opaque to this helper — only the label and its membership in the allowed sets matter for AC evaluation. ``label`` is the SUT's emitted ``source_label`` value; the helper validates it against ``ALLOWED_SOURCE_LABELS`` from ``estimate_schema``. """ frame_id: str label: str @dataclass(frozen=True) class StaleTileRejection: """One FDR ``tile-load-rejected`` event with ``reason == "stale"``.""" tile_id: str reason: str # always ``"stale"`` for rejections of interest @dataclass(frozen=True) class AgedTileRejectionReport: """AC-1 / AC-2 of FT-N-05: aged tiles must never produce satellite_anchored. Decision matrix: * ``has_stale_rejection`` True AND ``anchored_count`` == 0 → PASS via Signal A. * ``has_stale_rejection`` False AND every frame in the downgrade set → PASS via Signal B. * Any frame with ``label == "satellite_anchored"`` → FAIL, period. * No emissions observed AND no stale rejection → FAIL (we can't conclude the freshness gate fired at all). * Any frame with a label outside ``ALLOWED_SOURCE_LABELS`` → FAIL (this is itself a contract violation per FT-P-03 / AC-1.4 — we surface it loudly via ``illegal_labels``). """ sub_case_id: str fixture: str sector: str emissions_observed: int anchored_frame_ids: tuple[str, ...] downgrade_frame_count: int illegal_labels: tuple[tuple[str, str], ...] # (frame_id, label) stale_rejections: tuple[str, ...] # tile_ids @property def has_stale_rejection(self) -> bool: return bool(self.stale_rejections) @property def anchored_count(self) -> int: return len(self.anchored_frame_ids) @property def signal_a_holds(self) -> bool: """Signal A: SUT rejected the aged tiles at load time.""" return self.has_stale_rejection and self.anchored_count == 0 @property def signal_b_holds(self) -> bool: """Signal B: SUT loaded but downgrades every emission. Requires at least one observation AND every observed emission falls in the downgrade set (and zero anchored emissions). """ return ( self.emissions_observed > 0 and self.downgrade_frame_count == self.emissions_observed and self.anchored_count == 0 ) @property def passes(self) -> bool: if self.illegal_labels: return False if self.anchored_count > 0: return False return self.signal_a_holds or self.signal_b_holds def evaluate_aged_tile_rejection( sub_case_id: str, fixture: str, sector: str, emissions: Sequence[SourceLabelEmission], rejections: Iterable[StaleTileRejection], ) -> AgedTileRejectionReport: """AC-1 / AC-2: decide a single sub-case. ``sub_case_id`` is the human-readable label the test attaches to this sub-case for traceability (e.g., "7mo_active_conflict"). """ anchored: list[str] = [] downgrade: int = 0 illegal: list[tuple[str, str]] = [] for em in emissions: if em.label not in ALLOWED_SOURCE_LABELS: illegal.append((em.frame_id, em.label)) continue if em.label == SATELLITE_ANCHORED_LABEL: anchored.append(em.frame_id) elif em.label in ACCEPTABLE_DOWNGRADE_LABELS: downgrade += 1 stale_tile_ids = tuple(r.tile_id for r in rejections if r.reason == TILE_LOAD_REJECTED_STALE_REASON) return AgedTileRejectionReport( sub_case_id=sub_case_id, fixture=fixture, sector=sector, emissions_observed=len(emissions), anchored_frame_ids=tuple(anchored), downgrade_frame_count=downgrade, illegal_labels=tuple(illegal), stale_rejections=stale_tile_ids, ) # ─────────────────────── FDR record projection ─────────────────────── def project_stale_rejection_payload(payload: object) -> StaleTileRejection | None: """Project one FDR ``tile-load-rejected`` payload onto a ``StaleTileRejection``. The payload is expected to carry: * ``tile_id`` or ``id`` (str) * ``reason`` (str) — only ``"stale"`` is relevant for this AC Returns ``None`` when malformed OR when the reason is not stale — AC-NEW-6 cares only about the stale-rejection subset. """ if not isinstance(payload, dict): return None reason = payload.get("reason") if reason != TILE_LOAD_REJECTED_STALE_REASON: return None raw_id = payload.get("tile_id") or payload.get("id") if not isinstance(raw_id, str): return None return StaleTileRejection(tile_id=raw_id, reason=reason) def iter_stale_rejection_payloads(records: Iterable[object]) -> Iterable[StaleTileRejection]: """Filter records → yield well-formed stale ``StaleTileRejection`` events.""" for rec in records: rt = getattr(rec, "record_type", None) if rt != TILE_LOAD_REJECTED_FDR_KIND: continue rejection = project_stale_rejection_payload(getattr(rec, "payload", None)) if rejection is not None: yield rejection