mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 23:21:12 +00:00
a644debdb7
- AZ-416 (FT-P-09-AP): fills mavproxy_tlog_reader.iter_messages with pymavlink body (AZ-406 surface kept); adds ap_contract_evaluator covering AC-1 (signing handshake <=5s), AC-2 (GPS_INPUT >=4.5 Hz), AC-3 (EK3_SRC1_POSXY=3), AC-4 (GPS_RAW_INT health >=80%); scenario forces fc_adapter=ardupilot. - AZ-417 (FT-P-09-iNav): msp_frame_observer covering AC-2 (MSP rate) and AC-3 (fix_type/provider/numSat); scenario forces fc_adapter=inav. - AZ-419 (FT-P-11): cold_start_evaluator covering AC-1 (operator manifest origin), AC-2 (FC EKF fallback), AC-3 (no-origin abort), AC-4 (bounded-delta conflict, ADR-010 Principle #11 amended); scenario parametrized on origin_source plus dedicated no-origin abort scenario. - All scenarios skip-gated on upstream frame_source_replay / imu_replay / fdr_reader / sitl_observer extensions. - +67 unit tests; full e2e unit suite: 460 passed. - K=3 cumulative review fired: PASS for batches 70-72. See _docs/03_implementation/batch_72_report.md, _docs/03_implementation/reviews/batch_72_review.md, _docs/03_implementation/cumulative_review_batches_70-72_cycle1_report.md. Co-authored-by: Cursor <cursoragent@cursor.com>
310 lines
11 KiB
Python
310 lines
11 KiB
Python
"""Cold-start initialization evaluation for FT-P-11 (AZ-419 / ADR-010 / AC-5.1).
|
||
|
||
ADR-010 splits cold-start into two paths:
|
||
|
||
* **Primary** (operator manifest, AZ-490): C12 bakes
|
||
``flight.takeoff_origin`` into the C10 Manifest from the operator-
|
||
authored mission; airborne C5 consumes it BEFORE any sensor sample
|
||
via ``set_takeoff_origin``. Used even when the FC EKF has no valid
|
||
GPS.
|
||
* **Secondary** (FC EKF, legacy AC-5.1): when the Manifest carries no
|
||
``takeoff_origin``, the SUT falls back to the FC EKF snapshot.
|
||
* **Bounded-delta conflict** (Principle #11 amended): both signals
|
||
present but ``|operator − fc_ekf| > 200 m`` → operator wins; FC GPS
|
||
is logged as suspect via a ``c5.gps_bounded_delta.reject`` FDR
|
||
record naming both points.
|
||
|
||
This helper owns the pure-logic side:
|
||
|
||
* ``write_manifest`` / ``read_manifest`` — manipulate the fixture
|
||
Manifest the test builder produces.
|
||
* ``read_cold_boot_fixture`` — parse the AZ-408 cold-boot snapshot
|
||
JSON into a typed ``ColdBootSnapshot``.
|
||
* ``evaluate_first_estimate`` — distance vs expected origin + source
|
||
label rules + FDR record presence checks.
|
||
|
||
Public-boundary discipline: does NOT import any
|
||
``src/gps_denied_onboard`` symbol.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import Iterable, Mapping, Sequence
|
||
|
||
from .geo import distance_m
|
||
|
||
ACCURACY_BUDGET_M = 50.0 # AC-1/AC-2/AC-4: estimate within ±50 m of origin
|
||
BOUNDED_DELTA_TRIGGER_M = 200.0 # ADR-010 Principle #11 amended
|
||
FIRST_EMISSION_BUDGET_S = 30.0 # AC-1/AC-NEW-1
|
||
FORBIDDEN_FIRST_LABEL_BOUNDED_DELTA = "satellite_anchored"
|
||
FDR_RECORD_ORIGIN_SET = "c5.cold_start_origin.set"
|
||
FDR_RECORD_ORIGIN_UNAVAILABLE = "c5.cold_start_origin.unavailable"
|
||
FDR_RECORD_BOUNDED_DELTA_REJECT = "c5.gps_bounded_delta.reject"
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class LatLonAlt:
|
||
"""One geodetic point: WGS84 degrees + altitude in meters."""
|
||
|
||
lat_deg: float
|
||
lon_deg: float
|
||
alt_m: float
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class ManifestOrigin:
|
||
"""A subset of the C10 Manifest for FT-P-11 — just the takeoff_origin."""
|
||
|
||
takeoff_origin: LatLonAlt | None
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class ColdBootSnapshot:
|
||
"""Parsed AZ-408 cold-boot fixture (FC EKF snapshot pose)."""
|
||
|
||
lat_deg: float
|
||
lon_deg: float
|
||
alt_m: float
|
||
schema: str
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class OutboundEstimate:
|
||
"""First outbound estimate observed by the scenario."""
|
||
|
||
monotonic_ms: int
|
||
lat_deg: float
|
||
lon_deg: float
|
||
source_label: str
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class FdrAuditRecord:
|
||
"""One FDR record relevant to cold-start auditing."""
|
||
|
||
monotonic_ms: int
|
||
record_type: str
|
||
payload: Mapping[str, object]
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class FirstEstimateReport:
|
||
"""AC-1 / AC-2 / AC-4: distance + label + FDR record audit."""
|
||
|
||
origin_source: str
|
||
expected_origin: LatLonAlt | None
|
||
actual_estimate: OutboundEstimate | None
|
||
distance_m: float | None
|
||
source_label_ok: bool
|
||
fdr_origin_set_seen: bool
|
||
fdr_origin_set_source: str | None
|
||
fdr_bounded_delta_seen: bool
|
||
fdr_bounded_delta_a: LatLonAlt | None
|
||
fdr_bounded_delta_b: LatLonAlt | None
|
||
|
||
@property
|
||
def passes_distance(self) -> bool:
|
||
return (
|
||
self.distance_m is not None
|
||
and self.distance_m <= ACCURACY_BUDGET_M
|
||
)
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class NoOriginReport:
|
||
"""AC-3: SUT MUST refuse takeoff when no origin is available."""
|
||
|
||
estimate_within_budget: bool # True iff an estimate WAS produced — failure mode
|
||
fdr_origin_unavailable_seen: bool
|
||
|
||
@property
|
||
def passes(self) -> bool:
|
||
# AC-3 passes when NO estimate was produced AND the FDR records
|
||
# the takeoff-abort signal.
|
||
return not self.estimate_within_budget and self.fdr_origin_unavailable_seen
|
||
|
||
|
||
def write_manifest(out_path: Path, takeoff_origin: LatLonAlt | None) -> Path:
|
||
"""Write a minimal C10-Manifest-shaped JSON for the test fixture builder.
|
||
|
||
The schema mirrors the AZ-323 canonical Manifest serialization just
|
||
closely enough that the SUT's ``set_takeoff_origin`` consumer
|
||
accepts it. Field shape mirrors `_docs/02_document/contracts/c12_*`.
|
||
"""
|
||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||
payload: dict[str, object] = {"_schema": "ft-p-11-test-manifest/v1"}
|
||
if takeoff_origin is not None:
|
||
payload["flight"] = {
|
||
"takeoff_origin": {
|
||
"lat_deg": takeoff_origin.lat_deg,
|
||
"lon_deg": takeoff_origin.lon_deg,
|
||
"alt_m": takeoff_origin.alt_m,
|
||
}
|
||
}
|
||
else:
|
||
payload["flight"] = {}
|
||
out_path.write_text(json.dumps(payload, indent=2))
|
||
return out_path
|
||
|
||
|
||
def read_manifest(manifest_path: Path) -> ManifestOrigin:
|
||
"""Read a Manifest JSON and extract the ``takeoff_origin`` if present."""
|
||
if not manifest_path.exists():
|
||
raise FileNotFoundError(f"manifest not found: {manifest_path}")
|
||
payload = json.loads(manifest_path.read_text())
|
||
origin_raw = payload.get("flight", {}).get("takeoff_origin")
|
||
if origin_raw is None:
|
||
return ManifestOrigin(takeoff_origin=None)
|
||
return ManifestOrigin(
|
||
takeoff_origin=LatLonAlt(
|
||
lat_deg=float(origin_raw["lat_deg"]),
|
||
lon_deg=float(origin_raw["lon_deg"]),
|
||
alt_m=float(origin_raw["alt_m"]),
|
||
)
|
||
)
|
||
|
||
|
||
def read_cold_boot_fixture(fixture_path: Path) -> ColdBootSnapshot:
|
||
"""Parse the AZ-408 cold-boot JSON into a typed snapshot.
|
||
|
||
Converts the fixture's ``lat_e7 / lon_e7 / alt_mm`` (MAVLink int32
|
||
units, 1e-7 deg + millimeters) to ``lat_deg / lon_deg / alt_m``.
|
||
"""
|
||
if not fixture_path.exists():
|
||
raise FileNotFoundError(f"cold-boot fixture not found: {fixture_path}")
|
||
payload = json.loads(fixture_path.read_text())
|
||
schema = str(payload.get("_schema", ""))
|
||
pose = payload["global_position_int"]
|
||
return ColdBootSnapshot(
|
||
lat_deg=int(pose["lat_e7"]) / 1e7,
|
||
lon_deg=int(pose["lon_e7"]) / 1e7,
|
||
alt_m=int(pose["alt_mm"]) / 1000.0,
|
||
schema=schema,
|
||
)
|
||
|
||
|
||
def _scan_fdr_for_cold_start(
|
||
fdr_records: Iterable[FdrAuditRecord],
|
||
) -> dict[str, object]:
|
||
"""Single pass collecting all cold-start-relevant FDR signals."""
|
||
origin_set_source: str | None = None
|
||
origin_set_seen = False
|
||
origin_unavailable_seen = False
|
||
bounded_delta_seen = False
|
||
bounded_delta_a: LatLonAlt | None = None
|
||
bounded_delta_b: LatLonAlt | None = None
|
||
for r in fdr_records:
|
||
if r.record_type == FDR_RECORD_ORIGIN_SET:
|
||
origin_set_seen = True
|
||
src = r.payload.get("source")
|
||
if src is not None:
|
||
origin_set_source = str(src)
|
||
elif r.record_type == FDR_RECORD_ORIGIN_UNAVAILABLE:
|
||
origin_unavailable_seen = True
|
||
elif r.record_type == FDR_RECORD_BOUNDED_DELTA_REJECT:
|
||
bounded_delta_seen = True
|
||
a = r.payload.get("a")
|
||
b = r.payload.get("b")
|
||
if isinstance(a, Mapping):
|
||
bounded_delta_a = LatLonAlt(
|
||
lat_deg=float(a["lat_deg"]), # type: ignore[arg-type]
|
||
lon_deg=float(a["lon_deg"]), # type: ignore[arg-type]
|
||
alt_m=float(a.get("alt_m", 0.0)), # type: ignore[arg-type]
|
||
)
|
||
if isinstance(b, Mapping):
|
||
bounded_delta_b = LatLonAlt(
|
||
lat_deg=float(b["lat_deg"]), # type: ignore[arg-type]
|
||
lon_deg=float(b["lon_deg"]), # type: ignore[arg-type]
|
||
alt_m=float(b.get("alt_m", 0.0)), # type: ignore[arg-type]
|
||
)
|
||
return {
|
||
"origin_set_seen": origin_set_seen,
|
||
"origin_set_source": origin_set_source,
|
||
"origin_unavailable_seen": origin_unavailable_seen,
|
||
"bounded_delta_seen": bounded_delta_seen,
|
||
"bounded_delta_a": bounded_delta_a,
|
||
"bounded_delta_b": bounded_delta_b,
|
||
}
|
||
|
||
|
||
def evaluate_first_estimate(
|
||
*,
|
||
origin_source: str,
|
||
expected_origin: LatLonAlt | None,
|
||
first_estimate: OutboundEstimate | None,
|
||
fdr_records: Sequence[FdrAuditRecord],
|
||
) -> FirstEstimateReport:
|
||
"""Evaluate AC-1/AC-2/AC-4 given the first observed outbound estimate.
|
||
|
||
``origin_source`` is one of:
|
||
* ``"operator_manifest"`` — AC-1: distance ≤50 m of A AND FDR has
|
||
``c5.cold_start_origin.set(source="manifest")``.
|
||
* ``"fc_ekf"`` — AC-2: distance ≤50 m of FC EKF snapshot AND FDR
|
||
has ``c5.cold_start_origin.set(source="fc_ekf")``.
|
||
* ``"bounded_delta_conflict"`` — AC-4: distance ≤50 m of A;
|
||
source_label != ``satellite_anchored``; FDR has
|
||
``c5.gps_bounded_delta.reject`` naming both A and B.
|
||
|
||
Any other source string raises ``ValueError``.
|
||
"""
|
||
if origin_source not in {"operator_manifest", "fc_ekf", "bounded_delta_conflict"}:
|
||
raise ValueError(
|
||
f"unknown origin_source {origin_source!r}; expected one of "
|
||
"{operator_manifest, fc_ekf, bounded_delta_conflict}"
|
||
)
|
||
|
||
distance: float | None = None
|
||
if first_estimate is not None and expected_origin is not None:
|
||
distance = distance_m(
|
||
expected_origin.lat_deg, expected_origin.lon_deg,
|
||
first_estimate.lat_deg, first_estimate.lon_deg,
|
||
)
|
||
|
||
if origin_source == "bounded_delta_conflict":
|
||
label_ok = (
|
||
first_estimate is not None
|
||
and first_estimate.source_label != FORBIDDEN_FIRST_LABEL_BOUNDED_DELTA
|
||
)
|
||
else:
|
||
label_ok = first_estimate is not None # any label acceptable for AC-1/AC-2
|
||
|
||
audit = _scan_fdr_for_cold_start(fdr_records)
|
||
|
||
return FirstEstimateReport(
|
||
origin_source=origin_source,
|
||
expected_origin=expected_origin,
|
||
actual_estimate=first_estimate,
|
||
distance_m=distance,
|
||
source_label_ok=label_ok,
|
||
fdr_origin_set_seen=bool(audit["origin_set_seen"]),
|
||
fdr_origin_set_source=audit["origin_set_source"], # type: ignore[arg-type]
|
||
fdr_bounded_delta_seen=bool(audit["bounded_delta_seen"]),
|
||
fdr_bounded_delta_a=audit["bounded_delta_a"], # type: ignore[arg-type]
|
||
fdr_bounded_delta_b=audit["bounded_delta_b"], # type: ignore[arg-type]
|
||
)
|
||
|
||
|
||
def evaluate_no_origin_path(
|
||
*,
|
||
first_estimate: OutboundEstimate | None,
|
||
fdr_records: Sequence[FdrAuditRecord],
|
||
) -> NoOriginReport:
|
||
"""AC-3: Manifest empty + SITL no GPS → SUT must NOT emit anything.
|
||
|
||
Returns ``passes=True`` iff no outbound estimate was produced AND
|
||
the FDR carries ``c5.cold_start_origin.unavailable``.
|
||
"""
|
||
audit = _scan_fdr_for_cold_start(fdr_records)
|
||
return NoOriginReport(
|
||
estimate_within_budget=first_estimate is not None,
|
||
fdr_origin_unavailable_seen=bool(audit["origin_unavailable_seen"]),
|
||
)
|
||
|
||
|
||
def bounded_delta_distance_m(a: LatLonAlt, b: LatLonAlt) -> float:
|
||
"""Convenience: AC-4 trigger condition is ``vincenty(A, B) > 200 m``."""
|
||
return distance_m(a.lat_deg, a.lon_deg, b.lat_deg, b.lon_deg)
|