"""Mid-flight tile generation + freshness evaluators (AZ-422 / FT-P-17 + FT-N-06). Pure-logic evaluators sourced from the FDR archive (per-tile generation records + freshness-gate events) and the mock-suite-sat-service audit log (landing-time upload acks). Sub-scenarios: * **FT-P-17 / AC-8.4** — five evaluators: * generation cadence (≥ 1 tile / 3 s of high-quality nav frames); * quality-metadata sufficiency (per-tile fields the Service voting layer needs — Mode B Fact #105: capture_utc, source_provider, resolution_m_per_px, cloud_coverage_pct, geo_accuracy_m, plus publish-request fields: tile_id, bbox_wgs84, zoom_level, descriptor_sha256, payload_size_bytes); * dedup (no two tiles share footprint within ±1 m AND GSD within ±5 %); * landing-event upload (every generated tile has an audit entry in the mock-suite-sat-service). * **FT-N-06 / AC-NEW-6** — two evaluators: * capture-date freshness (|capture_utc − generated_at| ≤ 60 s); * freshness-gate (no ``tile-load-rejected: stale`` FDR event for a freshly generated tile). All evaluators consume Python dataclasses / dicts. The HTTP fetch and FDR walk live in scenario tests; this module only decides whether the parsed inputs satisfy the AC. Public-boundary discipline: NO imports from ``src/gps_denied_onboard``. """ from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timezone from typing import Iterable, Sequence from .geo import distance_m # ─────────────────────── FDR record kinds & schema ─────────────────────── MID_FLIGHT_TILE_FDR_KIND = "mid-flight-tile-output" TILE_LOAD_REJECTED_FDR_KIND = "tile-load-rejected" TILE_LOAD_REJECTED_STALE_REASON = "stale" MIN_TILES_PER_HIGH_QUALITY_WINDOW_S = 3.0 # ≥ 1 tile per ~3 s of high-quality nav frames CAPTURE_DATE_FRESHNESS_TOLERANCE_S = 60.0 DEDUP_FOOTPRINT_TOLERANCE_M = 1.0 DEDUP_GSD_TOLERANCE_FRACTION = 0.05 # ±5 % # Schema mirror — must stay in sync with ``e2e/fixtures/mock-suite-sat/app.py`` # ``TilePublishRequest`` + ``TileQualityMetadata``. TILE_REQUIRED_TOP_LEVEL_FIELDS: tuple[str, ...] = ( "tile_id", "bbox_wgs84", "zoom_level", "descriptor_sha256", "payload_size_bytes", "quality", ) TILE_REQUIRED_QUALITY_FIELDS: tuple[str, ...] = ( "capture_utc", "source_provider", "resolution_m_per_px", "cloud_coverage_pct", "geo_accuracy_m", ) @dataclass(frozen=True) class TileSpec: """Public-boundary projection of one mid-flight-tile-output record. Sourced from the FDR ``mid-flight-tile-output`` record. Mirrors the TilePublishRequest schema so the same dataclass feeds both the landing-event upload comparison (AC-4) and the per-tile evaluators. ``bbox_wgs84`` is ``(west_lon, south_lat, east_lon, north_lat)`` matching the mock-suite-sat-service contract. ``generated_at_monotonic_ms`` is the SUT's emission timestamp from the FDR envelope's ``ts`` (projected to monotonic ms by the FDR reader). ``capture_utc_iso`` is the per-tile field — they should agree within ``CAPTURE_DATE_FRESHNESS_TOLERANCE_S`` (FT-N-06). """ tile_id: str bbox_wgs84: tuple[float, float, float, float] zoom_level: int descriptor_sha256: str payload_size_bytes: int quality: dict[str, object] generated_at_monotonic_ms: int capture_utc_iso: str | None = None # convenience accessor; same as quality["capture_utc"] def bbox_centre(bbox: tuple[float, float, float, float]) -> tuple[float, float]: """Return ``(lat, lon)`` of a WGS84 bbox ``(west, south, east, north)``.""" west, south, east, north = bbox return ((south + north) / 2.0, (west + east) / 2.0) # ─────────────────────────── FT-P-17 / AC-1 ─────────────────────────── @dataclass(frozen=True) class TileGenerationRateReport: """AC-1 of FT-P-17: ≥ 1 tile per ~3 s of high-quality nav frames.""" tile_count: int high_quality_window_s: float observed_rate_per_3s: float min_required_rate_per_3s: float = 1.0 @property def passes(self) -> bool: if self.high_quality_window_s <= 0: return False return self.observed_rate_per_3s >= self.min_required_rate_per_3s def evaluate_tile_generation_rate( tiles: Sequence[TileSpec], high_quality_window_s: float, *, window_s_per_tile: float = MIN_TILES_PER_HIGH_QUALITY_WINDOW_S, ) -> TileGenerationRateReport: """AC-1: rate of generated tiles over the high-quality nav-frame window. ``high_quality_window_s`` is the total wall-clock seconds during the replay that produced "high-quality" nav frames (defined by AC-2.1a normal-segment in `_docs/02_document/tests/blackbox-tests.md`). The scenario test computes this from the FDR's segment-quality records; the helper only divides. The AC threshold is ≥ 1 tile per ``window_s_per_tile`` seconds. Normalised to a "tiles per 3 s" rate so the report is unitless. """ if window_s_per_tile <= 0: raise ValueError(f"window_s_per_tile must be > 0, got {window_s_per_tile}") if high_quality_window_s <= 0: return TileGenerationRateReport( tile_count=len(tiles), high_quality_window_s=high_quality_window_s, observed_rate_per_3s=0.0, ) rate = (len(tiles) / high_quality_window_s) * window_s_per_tile return TileGenerationRateReport( tile_count=len(tiles), high_quality_window_s=high_quality_window_s, observed_rate_per_3s=rate, ) # ─────────────────────────── FT-P-17 / AC-2 ─────────────────────────── @dataclass(frozen=True) class TileQualityEntryReport: """Per-tile schema-completeness result.""" tile_id: str missing_top_level_fields: tuple[str, ...] missing_quality_fields: tuple[str, ...] @property def passes(self) -> bool: return not self.missing_top_level_fields and not self.missing_quality_fields @dataclass(frozen=True) class TileQualityReport: """AC-2 of FT-P-17: every tile carries the Mode B Fact #105 fields.""" entries: tuple[TileQualityEntryReport, ...] @property def failing_entries(self) -> tuple[TileQualityEntryReport, ...]: return tuple(e for e in self.entries if not e.passes) @property def passes(self) -> bool: if not self.entries: return False return not self.failing_entries def evaluate_tile_quality_metadata( tiles: Sequence[TileSpec], *, required_top_level: Sequence[str] = TILE_REQUIRED_TOP_LEVEL_FIELDS, required_quality: Sequence[str] = TILE_REQUIRED_QUALITY_FIELDS, ) -> TileQualityReport: """AC-2: every tile has all top-level + quality fields populated. "Populated" means the key is present in the underlying dict representation AND the value is not ``None``. A ``TileSpec`` constructed by the scenario test from the FDR record carries these fields as dataclass attributes; this helper still re-checks the quality dict for completeness because the dict mirror is the actual contract with the Service voting layer. """ entries: list[TileQualityEntryReport] = [] for tile in tiles: missing_top: list[str] = [] for f in required_top_level: if f == "quality": continue value = getattr(tile, _top_level_field_to_attr(f), None) if value is None: missing_top.append(f) missing_quality: list[str] = [] if not isinstance(tile.quality, dict): missing_quality = list(required_quality) else: for f in required_quality: if f not in tile.quality or tile.quality[f] is None: missing_quality.append(f) entries.append( TileQualityEntryReport( tile_id=tile.tile_id or "", missing_top_level_fields=tuple(missing_top), missing_quality_fields=tuple(missing_quality), ) ) return TileQualityReport(entries=tuple(entries)) def _top_level_field_to_attr(field: str) -> str: """Map TilePublishRequest field name to the TileSpec attribute.""" return field # 1:1 mapping; documented for future drift handling # ─────────────────────────── FT-P-17 / AC-3 ─────────────────────────── @dataclass(frozen=True) class TileDedupReport: """AC-3 of FT-P-17: no two tiles share a (footprint, GSD) bin.""" duplicate_pairs: tuple[tuple[str, str], ...] footprint_tolerance_m: float = DEDUP_FOOTPRINT_TOLERANCE_M gsd_tolerance_fraction: float = DEDUP_GSD_TOLERANCE_FRACTION @property def duplicate_count(self) -> int: return len(self.duplicate_pairs) @property def passes(self) -> bool: return self.duplicate_count == 0 def evaluate_dedup( tiles: Sequence[TileSpec], *, footprint_tolerance_m: float = DEDUP_FOOTPRINT_TOLERANCE_M, gsd_tolerance_fraction: float = DEDUP_GSD_TOLERANCE_FRACTION, ) -> TileDedupReport: """AC-3: pair-wise dedup check. Two tiles are duplicates iff: * Vincenty distance between their bbox centres ≤ ``footprint_tolerance_m`` AND * ``|gsd_a − gsd_b| / max(gsd_a, gsd_b) ≤ gsd_tolerance_fraction`` O(N²) — fine for the < 100 tiles per 5 min replay scenarios produce. Returns the offending ``(tile_id, tile_id)`` pairs. """ if footprint_tolerance_m < 0: raise ValueError(f"footprint_tolerance_m must be ≥0, got {footprint_tolerance_m}") if gsd_tolerance_fraction < 0: raise ValueError( f"gsd_tolerance_fraction must be ≥0, got {gsd_tolerance_fraction}" ) centres: list[tuple[float, float]] = [bbox_centre(t.bbox_wgs84) for t in tiles] gsds: list[float | None] = [_extract_gsd(t) for t in tiles] pairs: list[tuple[str, str]] = [] for i in range(len(tiles)): gsd_i = gsds[i] if gsd_i is None: continue for j in range(i + 1, len(tiles)): gsd_j = gsds[j] if gsd_j is None: continue denom = max(gsd_i, gsd_j) if denom == 0: continue gsd_delta_fraction = abs(gsd_i - gsd_j) / denom if gsd_delta_fraction > gsd_tolerance_fraction: continue d_m = distance_m( centres[i][0], centres[i][1], centres[j][0], centres[j][1] ) if d_m <= footprint_tolerance_m: pairs.append((tiles[i].tile_id, tiles[j].tile_id)) return TileDedupReport( duplicate_pairs=tuple(pairs), footprint_tolerance_m=footprint_tolerance_m, gsd_tolerance_fraction=gsd_tolerance_fraction, ) def _extract_gsd(tile: TileSpec) -> float | None: """Pull GSD (resolution_m_per_px) from the tile's quality dict.""" if not isinstance(tile.quality, dict): return None raw = tile.quality.get("resolution_m_per_px") if isinstance(raw, (int, float)): return float(raw) return None # ─────────────────────────── FT-P-17 / AC-4 ─────────────────────────── @dataclass(frozen=True) class TileUploadAckReport: """AC-4 of FT-P-17: every generated tile uploaded with HTTP 202.""" generated_tile_ids: tuple[str, ...] audit_tile_ids: tuple[str, ...] missing_from_audit: tuple[str, ...] @property def passes(self) -> bool: if not self.generated_tile_ids: return False return not self.missing_from_audit def evaluate_upload_acks( generated_tiles: Sequence[TileSpec], audit_entries: Sequence[dict], ) -> TileUploadAckReport: """AC-4: every generated tile_id appears in the mock-suite-sat-service audit. The mock-suite-sat-service ``POST /tiles`` endpoint records HTTP 202 responses to its run-scoped audit log; a tile that did not return 202 (i.e., was rejected with 400 or any forced-5xx) is NOT in the audit. So a tile_id present in ``generated_tiles`` but absent from ``audit_entries`` is by construction a missing ack. ``audit_entries`` is the ``entries`` field of the JSON response from ``GET /tiles/audit?run_id=``. """ generated_ids = tuple(t.tile_id for t in generated_tiles) audit_ids = tuple( e["tile_id"] for e in audit_entries if isinstance(e, dict) and "tile_id" in e ) audit_id_set = set(audit_ids) missing = tuple(tid for tid in generated_ids if tid not in audit_id_set) return TileUploadAckReport( generated_tile_ids=generated_ids, audit_tile_ids=audit_ids, missing_from_audit=missing, ) # ─────────────────────────── FT-N-06 / AC-5 ─────────────────────────── @dataclass(frozen=True) class CaptureDateFreshnessEntryReport: """Per-tile drift between ``capture_utc`` and ``generated_at``. Whether the drift passes the AC threshold is decided at the ``CaptureDateFreshnessReport`` level because the tolerance is a report-wide knob (AC-5 stipulates 60 s globally). """ tile_id: str drift_s: float | None # None when capture_utc cannot be parsed @dataclass(frozen=True) class CaptureDateFreshnessReport: """AC-5 of FT-N-06: |capture_utc - generated_at_wall_clock| ≤ 60 s.""" entries: tuple[CaptureDateFreshnessEntryReport, ...] tolerance_s: float = CAPTURE_DATE_FRESHNESS_TOLERANCE_S @property def failing_entries(self) -> tuple[CaptureDateFreshnessEntryReport, ...]: return tuple( e for e in self.entries if e.drift_s is None or abs(e.drift_s) > self.tolerance_s ) @property def passes(self) -> bool: if not self.entries: return False return not self.failing_entries def evaluate_capture_date_freshness( tiles: Sequence[TileSpec], *, tolerance_s: float = CAPTURE_DATE_FRESHNESS_TOLERANCE_S, ) -> CaptureDateFreshnessReport: """AC-5: per-tile capture_utc drift against generated_at_monotonic_ms. Drift is signed: ``capture_utc − generated_at``. A drift of +0 is "capture happened at generation"; negative drift means capture happened BEFORE generation (the usual direction — capture is instantaneous, generation is the orthorectification step that follows). A tile whose ``capture_utc`` cannot be parsed as ISO 8601 records drift_s = None and fails the AC. """ if tolerance_s <= 0: raise ValueError(f"tolerance_s must be > 0, got {tolerance_s}") entries: list[CaptureDateFreshnessEntryReport] = [] for tile in tiles: capture_str = tile.capture_utc_iso if capture_str is None and isinstance(tile.quality, dict): raw = tile.quality.get("capture_utc") if isinstance(raw, str): capture_str = raw drift: float | None if capture_str is None: drift = None else: parsed = _parse_iso8601_utc_seconds(capture_str) if parsed is None: drift = None else: drift = parsed - (tile.generated_at_monotonic_ms / 1000.0) entries.append( CaptureDateFreshnessEntryReport(tile_id=tile.tile_id, drift_s=drift) ) return CaptureDateFreshnessReport( entries=tuple(entries), tolerance_s=tolerance_s ) def _parse_iso8601_utc_seconds(ts: str) -> float | None: """Parse ISO 8601 ``ts`` into seconds-since-epoch; ``None`` on failure. Accepts the trailing ``Z`` shorthand that ``datetime.fromisoformat`` did not accept until 3.11. """ try: normalised = ts[:-1] + "+00:00" if ts.endswith("Z") else ts dt = datetime.fromisoformat(normalised) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.timestamp() except (TypeError, ValueError): return None # ─────────────────────────── FT-N-06 / AC-6 ─────────────────────────── @dataclass(frozen=True) class FreshnessGateReport: """AC-6 of FT-N-06: no `tile-load-rejected: stale` for freshly generated tiles.""" generated_tile_ids: tuple[str, ...] stale_rejections: tuple[str, ...] @property def passes(self) -> bool: return not self.stale_rejections def evaluate_freshness_gate( generated_tiles: Sequence[TileSpec], fdr_rejection_records: Iterable[dict], *, stale_reason: str = TILE_LOAD_REJECTED_STALE_REASON, ) -> FreshnessGateReport: """AC-6: any ``tile-load-rejected: stale`` for a freshly generated tile fails. ``fdr_rejection_records`` is the payload dict of each FDR record whose ``record_type == TILE_LOAD_REJECTED_FDR_KIND``. A "stale" rejection sets ``reason == "stale"``. If the rejected tile_id matches a generated tile_id, the freshness gate misclassified it. """ generated_ids = tuple(t.tile_id for t in generated_tiles) gen_id_set = set(generated_ids) stale: list[str] = [] for payload in fdr_rejection_records: if not isinstance(payload, dict): continue reason = payload.get("reason") if reason != stale_reason: continue tile_id = payload.get("id") or payload.get("tile_id") if isinstance(tile_id, str) and tile_id in gen_id_set: stale.append(tile_id) return FreshnessGateReport( generated_tile_ids=generated_ids, stale_rejections=tuple(stale) )