"""SUT outbound-estimate schema + WGS84 validation (FT-P-03, FT-P-14). Two thin contract checks shared by AZ-411's scenario file: 1. **Schema completeness** (AC-1 of FT-P-03): the outbound estimate must carry the four documented fields ``lat:float``, ``lon:float``, ``cov_semi_major_m:float``, ``last_satellite_anchor_age_ms:int`` — either inside the ``GPS_INPUT`` / ``MSP2_SENSOR_GPS`` payload, OR on a paired side-channel (per AC-4.3). 2. **Source-label set containment** (AC-2): the side-channel emission is exactly one of ``{satellite_anchored, visual_propagated, dead_reckoned}`` — anything else is a real defect. 3. **WGS84 range** (AC-3 of FT-P-14): decoded ``lat`` ∈ [-90, 90], ``lon`` ∈ [-180, 180]; scaling matches the protocol convention (AP/iNav `lat/lon` are 1e-7 scaled int32). The helpers operate on pure Python dict-like records — the scenario test pulls them from the SITL observer / tlog reader and hands them in. That keeps these helpers unit-testable without any docker harness. Public-boundary discipline: this module does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations from dataclasses import dataclass from typing import Iterable, Mapping REQUIRED_FIELDS: tuple[tuple[str, type], ...] = ( ("lat", float), ("lon", float), ("cov_semi_major_m", float), ("last_satellite_anchor_age_ms", int), ) ALLOWED_SOURCE_LABELS: frozenset[str] = frozenset( {"satellite_anchored", "visual_propagated", "dead_reckoned"} ) # Protocol scaling factors — exact integer 1e-7 per MAVLink GPS_INPUT # (`int32 lat / lon * 1e-7`) and iNav MSP2_SENSOR_GPS (same scaling). LAT_LON_SCALE = 1e-7 @dataclass(frozen=True) class SchemaValidationResult: """Outcome of a single ``validate_estimate_schema`` call.""" ok: bool missing_fields: list[str] wrong_typed_fields: list[str] @dataclass(frozen=True) class SourceLabelValidationResult: ok: bool observed: str | None reason: str | None # filled when not ok @dataclass(frozen=True) class Wgs84ValidationResult: ok: bool lat_deg: float | None lon_deg: float | None reason: str | None def validate_estimate_schema(record: Mapping[str, object]) -> SchemaValidationResult: """AC-1: all four documented fields present + correctly typed. The record may be the merged ``{payload_fields, sidechannel_fields}`` dict the test produces from ``GPS_INPUT.x`` + the paired ``STATUSTEXT`` / ``NAMED_VALUE_FLOAT`` channel. The helper is transport-agnostic; it just walks the four ``REQUIRED_FIELDS`` and checks the type. """ missing: list[str] = [] wrong: list[str] = [] for name, expected in REQUIRED_FIELDS: if name not in record: missing.append(name) continue value = record[name] # Accept bool only when bool is the expected type (Python's # ``isinstance(True, int)`` is True; we don't want that to # silently satisfy ``int``). if expected is int and isinstance(value, bool): wrong.append(name) continue if not isinstance(value, expected): wrong.append(name) return SchemaValidationResult( ok=not missing and not wrong, missing_fields=missing, wrong_typed_fields=wrong, ) def validate_source_label(label: object) -> SourceLabelValidationResult: """AC-2: label is exactly one of the three documented strings.""" if not isinstance(label, str): return SourceLabelValidationResult( ok=False, observed=None, reason=f"label is {type(label).__name__}, expected str" ) if label in ALLOWED_SOURCE_LABELS: return SourceLabelValidationResult(ok=True, observed=label, reason=None) return SourceLabelValidationResult( ok=False, observed=label, reason=f"label {label!r} not in {sorted(ALLOWED_SOURCE_LABELS)}" ) def validate_wgs84_range( lat_decoded_deg: float, lon_decoded_deg: float ) -> Wgs84ValidationResult: """AC-3 of FT-P-14: lat ∈ [-90, 90], lon ∈ [-180, 180].""" if not isinstance(lat_decoded_deg, (int, float)) or not isinstance( lon_decoded_deg, (int, float) ): return Wgs84ValidationResult( ok=False, lat_deg=None, lon_deg=None, reason="lat/lon not numeric", ) if lat_decoded_deg != lat_decoded_deg or lon_decoded_deg != lon_decoded_deg: return Wgs84ValidationResult( ok=False, lat_deg=lat_decoded_deg, lon_deg=lon_decoded_deg, reason="lat/lon is NaN", ) if not -90.0 <= lat_decoded_deg <= 90.0: return Wgs84ValidationResult( ok=False, lat_deg=lat_decoded_deg, lon_deg=lon_decoded_deg, reason=f"lat {lat_decoded_deg} out of [-90, 90]", ) if not -180.0 <= lon_decoded_deg <= 180.0: return Wgs84ValidationResult( ok=False, lat_deg=lat_decoded_deg, lon_deg=lon_decoded_deg, reason=f"lon {lon_decoded_deg} out of [-180, 180]", ) return Wgs84ValidationResult( ok=True, lat_deg=lat_decoded_deg, lon_deg=lon_decoded_deg, reason=None ) def decode_lat_lon_int32(lat_e7: int, lon_e7: int) -> tuple[float, float]: """Decode the AP/iNav 1e-7 int32 wire format to WGS84 degrees. Raises ValueError for inputs outside the int32 range — that's a transport corruption, not an out-of-bounds geographic value, and the test should surface it as such. """ INT32_MIN = -(2 ** 31) INT32_MAX = (2 ** 31) - 1 if not INT32_MIN <= lat_e7 <= INT32_MAX: raise ValueError(f"lat_e7 {lat_e7} outside int32 range") if not INT32_MIN <= lon_e7 <= INT32_MAX: raise ValueError(f"lon_e7 {lon_e7} outside int32 range") return lat_e7 * LAT_LON_SCALE, lon_e7 * LAT_LON_SCALE def aggregate_validations( records: Iterable[Mapping[str, object]], ) -> tuple[list[SchemaValidationResult], list[Wgs84ValidationResult]]: """Run schema + WGS84 validations over a record stream. Used by FT-P-03 / FT-P-14 to assert "every record satisfies both contracts" — typically against a single-image push (1 outbound record) but stream-friendly for soak-test re-use. """ schemas: list[SchemaValidationResult] = [] wgs84s: list[Wgs84ValidationResult] = [] for rec in records: schemas.append(validate_estimate_schema(rec)) lat = rec.get("lat") lon = rec.get("lon") if isinstance(lat, (int, float)) and isinstance(lon, (int, float)): wgs84s.append(validate_wgs84_range(float(lat), float(lon))) else: wgs84s.append( Wgs84ValidationResult( ok=False, lat_deg=None, lon_deg=None, reason="missing or non-numeric lat/lon for WGS84 check", ) ) return schemas, wgs84s