Files
gps-denied-onboard/e2e/runner/helpers/estimate_schema.py
T
Oleksandr Bezdieniezhnykh 702a0c0ff3 [AZ-408] [AZ-410] [AZ-411] Batch 69: synth injectors + FT-P-02/03/14
AZ-408 (3pt) — Replace AZ-406 injector scaffolds with concrete generators:
- outlier.py: deterministic stride + far-away tile replacement; AC-2 ≥350m offset
- blackout_spoof.py: paired video blackout + FC GPS spoof with ≤40ms alignment;
  AC-4 realistic fix_type/hdop; AC-NEW-8 200-500m inter-spoof deltas
- multi_segment.py: ≥3 disjoint windows, ≥30s gaps, ≤25% coverage
- fc_proxy.py: timed-splice runtime proxy with pre-activate RuntimeError guard
- _common.py: derive_rng + tile-manifest reader + tmpfs helpers
- injector_fixtures.py: pytest fixtures wired via runner conftest

AZ-410 (3pt) — FT-P-02 cumulative drift between satellite anchors:
- anchor_pair_detector.py: AC-1 detection, AC-2/3 pass-fraction,
  AC-4 monotonicity check, CSV evidence
- test_ft_p_02_derkachi_drift.py: scenario gated on upstream helper
  NotImplementedError (frame_source_replay / fdr_reader / imu_replay)

AZ-411 (2pt) — FT-P-03 + FT-P-14 schema + WGS84:
- estimate_schema.py: AC-1 schema completeness, AC-2 source-label set
  containment, AC-3 WGS84 range + int32 1e-7 decode
- test_ft_p_03_14_schema_wgs84.py: shared single-image-push scenario

Tests: 248 unit tests pass (+91 vs batch 68).
Reports: batch_69_report.md, batch_69_review.md (PASS),
cumulative_review_batches_67-69_cycle1_report.md (PASS).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 17:54:00 +03:00

189 lines
6.8 KiB
Python

"""SUT outbound-estimate schema + WGS84 validation (FT-P-03, FT-P-14).
Two thin contract checks shared by AZ-411's scenario file:
1. **Schema completeness** (AC-1 of FT-P-03):
the outbound estimate must carry the four documented fields
``lat:float``, ``lon:float``, ``cov_semi_major_m:float``,
``last_satellite_anchor_age_ms:int`` — either inside the
``GPS_INPUT`` / ``MSP2_SENSOR_GPS`` payload, OR on a paired
side-channel (per AC-4.3).
2. **Source-label set containment** (AC-2): the side-channel emission
is exactly one of ``{satellite_anchored, visual_propagated,
dead_reckoned}`` — anything else is a real defect.
3. **WGS84 range** (AC-3 of FT-P-14): decoded ``lat`` ∈ [-90, 90],
``lon`` ∈ [-180, 180]; scaling matches the protocol convention
(AP/iNav `lat/lon` are 1e-7 scaled int32).
The helpers operate on pure Python dict-like records — the scenario
test pulls them from the SITL observer / tlog reader and hands them in.
That keeps these helpers unit-testable without any docker harness.
Public-boundary discipline: this module does NOT import any
``src/gps_denied_onboard`` symbol.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable, Mapping
REQUIRED_FIELDS: tuple[tuple[str, type], ...] = (
("lat", float),
("lon", float),
("cov_semi_major_m", float),
("last_satellite_anchor_age_ms", int),
)
ALLOWED_SOURCE_LABELS: frozenset[str] = frozenset(
{"satellite_anchored", "visual_propagated", "dead_reckoned"}
)
# Protocol scaling factors — exact integer 1e-7 per MAVLink GPS_INPUT
# (`int32 lat / lon * 1e-7`) and iNav MSP2_SENSOR_GPS (same scaling).
LAT_LON_SCALE = 1e-7
@dataclass(frozen=True)
class SchemaValidationResult:
"""Outcome of a single ``validate_estimate_schema`` call."""
ok: bool
missing_fields: list[str]
wrong_typed_fields: list[str]
@dataclass(frozen=True)
class SourceLabelValidationResult:
ok: bool
observed: str | None
reason: str | None # filled when not ok
@dataclass(frozen=True)
class Wgs84ValidationResult:
ok: bool
lat_deg: float | None
lon_deg: float | None
reason: str | None
def validate_estimate_schema(record: Mapping[str, object]) -> SchemaValidationResult:
"""AC-1: all four documented fields present + correctly typed.
The record may be the merged ``{payload_fields, sidechannel_fields}``
dict the test produces from ``GPS_INPUT.x`` + the paired
``STATUSTEXT`` / ``NAMED_VALUE_FLOAT`` channel. The helper is
transport-agnostic; it just walks the four ``REQUIRED_FIELDS`` and
checks the type.
"""
missing: list[str] = []
wrong: list[str] = []
for name, expected in REQUIRED_FIELDS:
if name not in record:
missing.append(name)
continue
value = record[name]
# Accept bool only when bool is the expected type (Python's
# ``isinstance(True, int)`` is True; we don't want that to
# silently satisfy ``int``).
if expected is int and isinstance(value, bool):
wrong.append(name)
continue
if not isinstance(value, expected):
wrong.append(name)
return SchemaValidationResult(
ok=not missing and not wrong,
missing_fields=missing,
wrong_typed_fields=wrong,
)
def validate_source_label(label: object) -> SourceLabelValidationResult:
"""AC-2: label is exactly one of the three documented strings."""
if not isinstance(label, str):
return SourceLabelValidationResult(
ok=False, observed=None, reason=f"label is {type(label).__name__}, expected str"
)
if label in ALLOWED_SOURCE_LABELS:
return SourceLabelValidationResult(ok=True, observed=label, reason=None)
return SourceLabelValidationResult(
ok=False, observed=label, reason=f"label {label!r} not in {sorted(ALLOWED_SOURCE_LABELS)}"
)
def validate_wgs84_range(
lat_decoded_deg: float, lon_decoded_deg: float
) -> Wgs84ValidationResult:
"""AC-3 of FT-P-14: lat ∈ [-90, 90], lon ∈ [-180, 180]."""
if not isinstance(lat_decoded_deg, (int, float)) or not isinstance(
lon_decoded_deg, (int, float)
):
return Wgs84ValidationResult(
ok=False, lat_deg=None, lon_deg=None,
reason="lat/lon not numeric",
)
if lat_decoded_deg != lat_decoded_deg or lon_decoded_deg != lon_decoded_deg:
return Wgs84ValidationResult(
ok=False, lat_deg=lat_decoded_deg, lon_deg=lon_decoded_deg,
reason="lat/lon is NaN",
)
if not -90.0 <= lat_decoded_deg <= 90.0:
return Wgs84ValidationResult(
ok=False, lat_deg=lat_decoded_deg, lon_deg=lon_decoded_deg,
reason=f"lat {lat_decoded_deg} out of [-90, 90]",
)
if not -180.0 <= lon_decoded_deg <= 180.0:
return Wgs84ValidationResult(
ok=False, lat_deg=lat_decoded_deg, lon_deg=lon_decoded_deg,
reason=f"lon {lon_decoded_deg} out of [-180, 180]",
)
return Wgs84ValidationResult(
ok=True, lat_deg=lat_decoded_deg, lon_deg=lon_decoded_deg, reason=None
)
def decode_lat_lon_int32(lat_e7: int, lon_e7: int) -> tuple[float, float]:
"""Decode the AP/iNav 1e-7 int32 wire format to WGS84 degrees.
Raises ValueError for inputs outside the int32 range — that's a
transport corruption, not an out-of-bounds geographic value, and
the test should surface it as such.
"""
INT32_MIN = -(2 ** 31)
INT32_MAX = (2 ** 31) - 1
if not INT32_MIN <= lat_e7 <= INT32_MAX:
raise ValueError(f"lat_e7 {lat_e7} outside int32 range")
if not INT32_MIN <= lon_e7 <= INT32_MAX:
raise ValueError(f"lon_e7 {lon_e7} outside int32 range")
return lat_e7 * LAT_LON_SCALE, lon_e7 * LAT_LON_SCALE
def aggregate_validations(
records: Iterable[Mapping[str, object]],
) -> tuple[list[SchemaValidationResult], list[Wgs84ValidationResult]]:
"""Run schema + WGS84 validations over a record stream.
Used by FT-P-03 / FT-P-14 to assert "every record satisfies both
contracts" — typically against a single-image push (1 outbound
record) but stream-friendly for soak-test re-use.
"""
schemas: list[SchemaValidationResult] = []
wgs84s: list[Wgs84ValidationResult] = []
for rec in records:
schemas.append(validate_estimate_schema(rec))
lat = rec.get("lat")
lon = rec.get("lon")
if isinstance(lat, (int, float)) and isinstance(lon, (int, float)):
wgs84s.append(validate_wgs84_range(float(lat), float(lon)))
else:
wgs84s.append(
Wgs84ValidationResult(
ok=False, lat_deg=None, lon_deg=None,
reason="missing or non-numeric lat/lon for WGS84 check",
)
)
return schemas, wgs84s