mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 23:01:13 +00:00
a644debdb7
- AZ-416 (FT-P-09-AP): fills mavproxy_tlog_reader.iter_messages with pymavlink body (AZ-406 surface kept); adds ap_contract_evaluator covering AC-1 (signing handshake <=5s), AC-2 (GPS_INPUT >=4.5 Hz), AC-3 (EK3_SRC1_POSXY=3), AC-4 (GPS_RAW_INT health >=80%); scenario forces fc_adapter=ardupilot. - AZ-417 (FT-P-09-iNav): msp_frame_observer covering AC-2 (MSP rate) and AC-3 (fix_type/provider/numSat); scenario forces fc_adapter=inav. - AZ-419 (FT-P-11): cold_start_evaluator covering AC-1 (operator manifest origin), AC-2 (FC EKF fallback), AC-3 (no-origin abort), AC-4 (bounded-delta conflict, ADR-010 Principle #11 amended); scenario parametrized on origin_source plus dedicated no-origin abort scenario. - All scenarios skip-gated on upstream frame_source_replay / imu_replay / fdr_reader / sitl_observer extensions. - +67 unit tests; full e2e unit suite: 460 passed. - K=3 cumulative review fired: PASS for batches 70-72. See _docs/03_implementation/batch_72_report.md, _docs/03_implementation/reviews/batch_72_review.md, _docs/03_implementation/cumulative_review_batches_70-72_cycle1_report.md. Co-authored-by: Cursor <cursoragent@cursor.com>
241 lines
7.9 KiB
Python
241 lines
7.9 KiB
Python
"""ArduPilot contract + signing-handshake evaluation for FT-P-09-AP (AZ-416).
|
||
|
||
Given the captured ``.tlog`` from ``mavproxy-listener`` plus a single
|
||
EK3_SRC1_POSXY parameter read, this helper validates:
|
||
|
||
* AC-1: signing handshake completes within ≤5 s
|
||
(``observe_signing_handshake`` — first signed message within the
|
||
window OR absence of ``BAD_SIGNATURE`` STATUSTEXT during it).
|
||
* AC-2: GPS_INPUT flow at ≥4.5 Hz over the 60 s replay
|
||
(``compute_gps_input_rate``).
|
||
* AC-3: EK3_SRC1_POSXY == 3 (``validate_ek3_src1_posxy`` — pure check
|
||
on the param value the caller fetched via mavproxy).
|
||
* AC-4: GPS_RAW_INT health — ``fix_type ≥ 3`` AND ``eph ≤ 200``
|
||
(HDOP ≤ 2.0) for ≥80 % of the 60 s window
|
||
(``evaluate_gps_raw_int_health``).
|
||
|
||
All inputs are pure ``Iterable[TlogMessage]``; the tlog ingestion is
|
||
delegated to ``runner.helpers.mavproxy_tlog_reader.iter_messages``.
|
||
|
||
Public-boundary discipline: does NOT import any
|
||
``src/gps_denied_onboard`` symbol.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from dataclasses import dataclass, field
|
||
from typing import Iterable, Sequence
|
||
|
||
from .mavproxy_tlog_reader import TlogMessage
|
||
|
||
HANDSHAKE_BUDGET_S = 5.0
|
||
GPS_INPUT_TARGET_RATE_HZ = 5.0
|
||
GPS_INPUT_MIN_RATE_HZ = 4.5
|
||
GPS_RAW_INT_MIN_FIX_TYPE = 3
|
||
GPS_RAW_INT_MAX_EPH = 200 # HDOP × 100 ≤ 200 → HDOP ≤ 2.0
|
||
GPS_RAW_INT_HEALTHY_FRACTION_REQUIRED = 0.80
|
||
EK3_SRC1_POSXY_REQUIRED = 3 # AP EKF source-set: 3 = GPS
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class HandshakeReport:
|
||
"""AC-1: signing-handshake completion observation."""
|
||
|
||
window_start_us: int
|
||
window_end_us: int
|
||
first_signed_us: int | None
|
||
bad_signature_count: int
|
||
setup_signing_seen: bool
|
||
|
||
@property
|
||
def lag_s(self) -> float | None:
|
||
if self.first_signed_us is None:
|
||
return None
|
||
return (self.first_signed_us - self.window_start_us) / 1_000_000.0
|
||
|
||
@property
|
||
def passes(self) -> bool:
|
||
return (
|
||
self.first_signed_us is not None
|
||
and self.lag_s is not None
|
||
and self.lag_s <= HANDSHAKE_BUDGET_S
|
||
and self.bad_signature_count == 0
|
||
)
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class GpsInputRateReport:
|
||
"""AC-2: GPS_INPUT rate over the replay window."""
|
||
|
||
frame_count: int
|
||
window_us: int
|
||
observed_rate_hz: float
|
||
target_rate_hz: float = GPS_INPUT_TARGET_RATE_HZ
|
||
min_required_hz: float = GPS_INPUT_MIN_RATE_HZ
|
||
|
||
@property
|
||
def passes(self) -> bool:
|
||
return (
|
||
self.window_us > 0
|
||
and self.observed_rate_hz >= self.min_required_hz
|
||
)
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class GpsRawIntHealthReport:
|
||
"""AC-4: GPS_RAW_INT fix_type + eph healthy fraction."""
|
||
|
||
total_samples: int
|
||
healthy_samples: int
|
||
fraction_required: float = GPS_RAW_INT_HEALTHY_FRACTION_REQUIRED
|
||
|
||
@property
|
||
def healthy_fraction(self) -> float:
|
||
if self.total_samples == 0:
|
||
return 0.0
|
||
return self.healthy_samples / self.total_samples
|
||
|
||
@property
|
||
def passes(self) -> bool:
|
||
return (
|
||
self.total_samples > 0
|
||
and self.healthy_fraction >= self.fraction_required
|
||
)
|
||
|
||
|
||
def observe_signing_handshake(
|
||
messages: Iterable[TlogMessage],
|
||
*,
|
||
handshake_window_us: int = int(HANDSHAKE_BUDGET_S * 1_000_000),
|
||
) -> HandshakeReport:
|
||
"""AC-1: first signed message within ``handshake_window_us``.
|
||
|
||
The handshake window starts at the FIRST observed message's
|
||
timestamp (the SUT cannot be heard from before that). The result
|
||
PASSES if a signed message arrives within the window AND no
|
||
``STATUSTEXT`` with ``BAD_SIGNATURE`` is observed during it.
|
||
|
||
The SETUP_SIGNING handshake exchange itself is unsigned by spec
|
||
(it's how the key is shared), so its presence is reported but does
|
||
NOT gate the pass — the gate is the first SIGNED follow-up.
|
||
"""
|
||
if handshake_window_us <= 0:
|
||
raise ValueError(f"handshake_window_us must be > 0, got {handshake_window_us}")
|
||
window_start: int | None = None
|
||
window_end: int | None = None
|
||
first_signed_us: int | None = None
|
||
bad_sig_count = 0
|
||
setup_signing_seen = False
|
||
|
||
for m in messages:
|
||
if window_start is None:
|
||
window_start = m.timestamp_us
|
||
window_end = window_start + handshake_window_us
|
||
if window_end is not None and m.timestamp_us > window_end:
|
||
break
|
||
if m.msg_type == "SETUP_SIGNING":
|
||
setup_signing_seen = True
|
||
if m.signed and first_signed_us is None:
|
||
first_signed_us = m.timestamp_us
|
||
if m.msg_type == "STATUSTEXT":
|
||
text = str(m.fields.get("text", "")).upper()
|
||
if "BAD_SIGNATURE" in text:
|
||
bad_sig_count += 1
|
||
|
||
return HandshakeReport(
|
||
window_start_us=window_start or 0,
|
||
window_end_us=window_end or 0,
|
||
first_signed_us=first_signed_us,
|
||
bad_signature_count=bad_sig_count,
|
||
setup_signing_seen=setup_signing_seen,
|
||
)
|
||
|
||
|
||
def compute_gps_input_rate(
|
||
messages: Iterable[TlogMessage],
|
||
*,
|
||
target_rate_hz: float = GPS_INPUT_TARGET_RATE_HZ,
|
||
min_required_hz: float = GPS_INPUT_MIN_RATE_HZ,
|
||
) -> GpsInputRateReport:
|
||
"""AC-2: GPS_INPUT cadence over the entire message stream."""
|
||
if min_required_hz < 0:
|
||
raise ValueError(f"min_required_hz must be ≥0, got {min_required_hz}")
|
||
timestamps = [m.timestamp_us for m in messages if m.msg_type == "GPS_INPUT"]
|
||
if len(timestamps) < 2:
|
||
return GpsInputRateReport(
|
||
frame_count=len(timestamps),
|
||
window_us=0,
|
||
observed_rate_hz=0.0,
|
||
target_rate_hz=target_rate_hz,
|
||
min_required_hz=min_required_hz,
|
||
)
|
||
window_us = timestamps[-1] - timestamps[0]
|
||
if window_us <= 0:
|
||
return GpsInputRateReport(
|
||
frame_count=len(timestamps),
|
||
window_us=window_us,
|
||
observed_rate_hz=0.0,
|
||
target_rate_hz=target_rate_hz,
|
||
min_required_hz=min_required_hz,
|
||
)
|
||
observed = (len(timestamps) - 1) / (window_us / 1_000_000.0)
|
||
return GpsInputRateReport(
|
||
frame_count=len(timestamps),
|
||
window_us=window_us,
|
||
observed_rate_hz=observed,
|
||
target_rate_hz=target_rate_hz,
|
||
min_required_hz=min_required_hz,
|
||
)
|
||
|
||
|
||
def validate_ek3_src1_posxy(value: int) -> bool:
|
||
"""AC-3: EK3_SRC1_POSXY must equal 3 (GPS source)."""
|
||
return value == EK3_SRC1_POSXY_REQUIRED
|
||
|
||
|
||
def evaluate_gps_raw_int_health(
|
||
messages: Iterable[TlogMessage],
|
||
*,
|
||
min_fix_type: int = GPS_RAW_INT_MIN_FIX_TYPE,
|
||
max_eph: int = GPS_RAW_INT_MAX_EPH,
|
||
fraction_required: float = GPS_RAW_INT_HEALTHY_FRACTION_REQUIRED,
|
||
) -> GpsRawIntHealthReport:
|
||
"""AC-4: ≥``fraction_required`` of GPS_RAW_INT samples must be healthy.
|
||
|
||
A sample is "healthy" iff ``fix_type ≥ min_fix_type`` AND
|
||
``eph ≤ max_eph``. Both must hold per the spec text.
|
||
"""
|
||
if not 0.0 <= fraction_required <= 1.0:
|
||
raise ValueError(
|
||
f"fraction_required must be in [0, 1], got {fraction_required}"
|
||
)
|
||
total = 0
|
||
healthy = 0
|
||
for m in messages:
|
||
if m.msg_type != "GPS_RAW_INT":
|
||
continue
|
||
total += 1
|
||
try:
|
||
fix_type = int(m.fields["fix_type"]) # type: ignore[arg-type]
|
||
eph = int(m.fields["eph"]) # type: ignore[arg-type]
|
||
except (KeyError, TypeError, ValueError):
|
||
continue
|
||
if fix_type >= min_fix_type and eph <= max_eph:
|
||
healthy += 1
|
||
return GpsRawIntHealthReport(
|
||
total_samples=total,
|
||
healthy_samples=healthy,
|
||
fraction_required=fraction_required,
|
||
)
|
||
|
||
|
||
def collect_messages_to_list(messages: Iterable[TlogMessage]) -> list[TlogMessage]:
|
||
"""Materialise an iterator into a list — convenience for multi-pass eval.
|
||
|
||
The scenario reads the tlog once via ``iter_messages`` and runs
|
||
multiple analyzers over the result. ``iter_messages`` returns a
|
||
generator that closes its underlying pymavlink connection on
|
||
exhaustion, so re-iteration is not safe without materialisation.
|
||
"""
|
||
return list(messages)
|