"""Cold-start initialization evaluation for FT-P-11 (AZ-419 / ADR-010 / AC-5.1). ADR-010 splits cold-start into two paths: * **Primary** (operator manifest, AZ-490): C12 bakes ``flight.takeoff_origin`` into the C10 Manifest from the operator- authored mission; airborne C5 consumes it BEFORE any sensor sample via ``set_takeoff_origin``. Used even when the FC EKF has no valid GPS. * **Secondary** (FC EKF, legacy AC-5.1): when the Manifest carries no ``takeoff_origin``, the SUT falls back to the FC EKF snapshot. * **Bounded-delta conflict** (Principle #11 amended): both signals present but ``|operator − fc_ekf| > 200 m`` → operator wins; FC GPS is logged as suspect via a ``c5.gps_bounded_delta.reject`` FDR record naming both points. This helper owns the pure-logic side: * ``write_manifest`` / ``read_manifest`` — manipulate the fixture Manifest the test builder produces. * ``read_cold_boot_fixture`` — parse the AZ-408 cold-boot snapshot JSON into a typed ``ColdBootSnapshot``. * ``evaluate_first_estimate`` — distance vs expected origin + source label rules + FDR record presence checks. Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations import json from dataclasses import dataclass from pathlib import Path from typing import Iterable, Mapping, Sequence from .geo import distance_m ACCURACY_BUDGET_M = 50.0 # AC-1/AC-2/AC-4: estimate within ±50 m of origin BOUNDED_DELTA_TRIGGER_M = 200.0 # ADR-010 Principle #11 amended FIRST_EMISSION_BUDGET_S = 30.0 # AC-1/AC-NEW-1 FORBIDDEN_FIRST_LABEL_BOUNDED_DELTA = "satellite_anchored" FDR_RECORD_ORIGIN_SET = "c5.cold_start_origin.set" FDR_RECORD_ORIGIN_UNAVAILABLE = "c5.cold_start_origin.unavailable" FDR_RECORD_BOUNDED_DELTA_REJECT = "c5.gps_bounded_delta.reject" @dataclass(frozen=True) class LatLonAlt: """One geodetic point: WGS84 degrees + altitude in meters.""" lat_deg: float lon_deg: float alt_m: float @dataclass(frozen=True) class ManifestOrigin: """A subset of the C10 Manifest for FT-P-11 — just the takeoff_origin.""" takeoff_origin: LatLonAlt | None @dataclass(frozen=True) class ColdBootSnapshot: """Parsed AZ-408 cold-boot fixture (FC EKF snapshot pose).""" lat_deg: float lon_deg: float alt_m: float schema: str @dataclass(frozen=True) class OutboundEstimate: """First outbound estimate observed by the scenario.""" monotonic_ms: int lat_deg: float lon_deg: float source_label: str @dataclass(frozen=True) class FdrAuditRecord: """One FDR record relevant to cold-start auditing.""" monotonic_ms: int record_type: str payload: Mapping[str, object] @dataclass(frozen=True) class FirstEstimateReport: """AC-1 / AC-2 / AC-4: distance + label + FDR record audit.""" origin_source: str expected_origin: LatLonAlt | None actual_estimate: OutboundEstimate | None distance_m: float | None source_label_ok: bool fdr_origin_set_seen: bool fdr_origin_set_source: str | None fdr_bounded_delta_seen: bool fdr_bounded_delta_a: LatLonAlt | None fdr_bounded_delta_b: LatLonAlt | None @property def passes_distance(self) -> bool: return ( self.distance_m is not None and self.distance_m <= ACCURACY_BUDGET_M ) @dataclass(frozen=True) class NoOriginReport: """AC-3: SUT MUST refuse takeoff when no origin is available.""" estimate_within_budget: bool # True iff an estimate WAS produced — failure mode fdr_origin_unavailable_seen: bool @property def passes(self) -> bool: # AC-3 passes when NO estimate was produced AND the FDR records # the takeoff-abort signal. return not self.estimate_within_budget and self.fdr_origin_unavailable_seen def write_manifest(out_path: Path, takeoff_origin: LatLonAlt | None) -> Path: """Write a minimal C10-Manifest-shaped JSON for the test fixture builder. The schema mirrors the AZ-323 canonical Manifest serialization just closely enough that the SUT's ``set_takeoff_origin`` consumer accepts it. Field shape mirrors `_docs/02_document/contracts/c12_*`. """ out_path.parent.mkdir(parents=True, exist_ok=True) payload: dict[str, object] = {"_schema": "ft-p-11-test-manifest/v1"} if takeoff_origin is not None: payload["flight"] = { "takeoff_origin": { "lat_deg": takeoff_origin.lat_deg, "lon_deg": takeoff_origin.lon_deg, "alt_m": takeoff_origin.alt_m, } } else: payload["flight"] = {} out_path.write_text(json.dumps(payload, indent=2)) return out_path def read_manifest(manifest_path: Path) -> ManifestOrigin: """Read a Manifest JSON and extract the ``takeoff_origin`` if present.""" if not manifest_path.exists(): raise FileNotFoundError(f"manifest not found: {manifest_path}") payload = json.loads(manifest_path.read_text()) origin_raw = payload.get("flight", {}).get("takeoff_origin") if origin_raw is None: return ManifestOrigin(takeoff_origin=None) return ManifestOrigin( takeoff_origin=LatLonAlt( lat_deg=float(origin_raw["lat_deg"]), lon_deg=float(origin_raw["lon_deg"]), alt_m=float(origin_raw["alt_m"]), ) ) def read_cold_boot_fixture(fixture_path: Path) -> ColdBootSnapshot: """Parse the AZ-408 cold-boot JSON into a typed snapshot. Converts the fixture's ``lat_e7 / lon_e7 / alt_mm`` (MAVLink int32 units, 1e-7 deg + millimeters) to ``lat_deg / lon_deg / alt_m``. """ if not fixture_path.exists(): raise FileNotFoundError(f"cold-boot fixture not found: {fixture_path}") payload = json.loads(fixture_path.read_text()) schema = str(payload.get("_schema", "")) pose = payload["global_position_int"] return ColdBootSnapshot( lat_deg=int(pose["lat_e7"]) / 1e7, lon_deg=int(pose["lon_e7"]) / 1e7, alt_m=int(pose["alt_mm"]) / 1000.0, schema=schema, ) def _scan_fdr_for_cold_start( fdr_records: Iterable[FdrAuditRecord], ) -> dict[str, object]: """Single pass collecting all cold-start-relevant FDR signals.""" origin_set_source: str | None = None origin_set_seen = False origin_unavailable_seen = False bounded_delta_seen = False bounded_delta_a: LatLonAlt | None = None bounded_delta_b: LatLonAlt | None = None for r in fdr_records: if r.record_type == FDR_RECORD_ORIGIN_SET: origin_set_seen = True src = r.payload.get("source") if src is not None: origin_set_source = str(src) elif r.record_type == FDR_RECORD_ORIGIN_UNAVAILABLE: origin_unavailable_seen = True elif r.record_type == FDR_RECORD_BOUNDED_DELTA_REJECT: bounded_delta_seen = True a = r.payload.get("a") b = r.payload.get("b") if isinstance(a, Mapping): bounded_delta_a = LatLonAlt( lat_deg=float(a["lat_deg"]), # type: ignore[arg-type] lon_deg=float(a["lon_deg"]), # type: ignore[arg-type] alt_m=float(a.get("alt_m", 0.0)), # type: ignore[arg-type] ) if isinstance(b, Mapping): bounded_delta_b = LatLonAlt( lat_deg=float(b["lat_deg"]), # type: ignore[arg-type] lon_deg=float(b["lon_deg"]), # type: ignore[arg-type] alt_m=float(b.get("alt_m", 0.0)), # type: ignore[arg-type] ) return { "origin_set_seen": origin_set_seen, "origin_set_source": origin_set_source, "origin_unavailable_seen": origin_unavailable_seen, "bounded_delta_seen": bounded_delta_seen, "bounded_delta_a": bounded_delta_a, "bounded_delta_b": bounded_delta_b, } def evaluate_first_estimate( *, origin_source: str, expected_origin: LatLonAlt | None, first_estimate: OutboundEstimate | None, fdr_records: Sequence[FdrAuditRecord], ) -> FirstEstimateReport: """Evaluate AC-1/AC-2/AC-4 given the first observed outbound estimate. ``origin_source`` is one of: * ``"operator_manifest"`` — AC-1: distance ≤50 m of A AND FDR has ``c5.cold_start_origin.set(source="manifest")``. * ``"fc_ekf"`` — AC-2: distance ≤50 m of FC EKF snapshot AND FDR has ``c5.cold_start_origin.set(source="fc_ekf")``. * ``"bounded_delta_conflict"`` — AC-4: distance ≤50 m of A; source_label != ``satellite_anchored``; FDR has ``c5.gps_bounded_delta.reject`` naming both A and B. Any other source string raises ``ValueError``. """ if origin_source not in {"operator_manifest", "fc_ekf", "bounded_delta_conflict"}: raise ValueError( f"unknown origin_source {origin_source!r}; expected one of " "{operator_manifest, fc_ekf, bounded_delta_conflict}" ) distance: float | None = None if first_estimate is not None and expected_origin is not None: distance = distance_m( expected_origin.lat_deg, expected_origin.lon_deg, first_estimate.lat_deg, first_estimate.lon_deg, ) if origin_source == "bounded_delta_conflict": label_ok = ( first_estimate is not None and first_estimate.source_label != FORBIDDEN_FIRST_LABEL_BOUNDED_DELTA ) else: label_ok = first_estimate is not None # any label acceptable for AC-1/AC-2 audit = _scan_fdr_for_cold_start(fdr_records) return FirstEstimateReport( origin_source=origin_source, expected_origin=expected_origin, actual_estimate=first_estimate, distance_m=distance, source_label_ok=label_ok, fdr_origin_set_seen=bool(audit["origin_set_seen"]), fdr_origin_set_source=audit["origin_set_source"], # type: ignore[arg-type] fdr_bounded_delta_seen=bool(audit["bounded_delta_seen"]), fdr_bounded_delta_a=audit["bounded_delta_a"], # type: ignore[arg-type] fdr_bounded_delta_b=audit["bounded_delta_b"], # type: ignore[arg-type] ) def evaluate_no_origin_path( *, first_estimate: OutboundEstimate | None, fdr_records: Sequence[FdrAuditRecord], ) -> NoOriginReport: """AC-3: Manifest empty + SITL no GPS → SUT must NOT emit anything. Returns ``passes=True`` iff no outbound estimate was produced AND the FDR carries ``c5.cold_start_origin.unavailable``. """ audit = _scan_fdr_for_cold_start(fdr_records) return NoOriginReport( estimate_within_budget=first_estimate is not None, fdr_origin_unavailable_seen=bool(audit["origin_unavailable_seen"]), ) def bounded_delta_distance_m(a: LatLonAlt, b: LatLonAlt) -> float: """Convenience: AC-4 trigger condition is ``vincenty(A, B) > 200 m``.""" return distance_m(a.lat_deg, a.lon_deg, b.lat_deg, b.lon_deg)