Files
gps-denied-onboard/e2e/runner/helpers/ap_contract_evaluator.py
T
Oleksandr Bezdieniezhnykh a644debdb7 [AZ-416] [AZ-417] [AZ-419] Test batch 72: FT-P-09 AP/iNav + FT-P-11 cold start
- AZ-416 (FT-P-09-AP): fills mavproxy_tlog_reader.iter_messages with
  pymavlink body (AZ-406 surface kept); adds ap_contract_evaluator
  covering AC-1 (signing handshake <=5s), AC-2 (GPS_INPUT >=4.5 Hz),
  AC-3 (EK3_SRC1_POSXY=3), AC-4 (GPS_RAW_INT health >=80%); scenario
  forces fc_adapter=ardupilot.
- AZ-417 (FT-P-09-iNav): msp_frame_observer covering AC-2 (MSP rate)
  and AC-3 (fix_type/provider/numSat); scenario forces
  fc_adapter=inav.
- AZ-419 (FT-P-11): cold_start_evaluator covering AC-1 (operator
  manifest origin), AC-2 (FC EKF fallback), AC-3 (no-origin abort),
  AC-4 (bounded-delta conflict, ADR-010 Principle #11 amended);
  scenario parametrized on origin_source plus dedicated no-origin
  abort scenario.
- All scenarios skip-gated on upstream frame_source_replay /
  imu_replay / fdr_reader / sitl_observer extensions.
- +67 unit tests; full e2e unit suite: 460 passed.
- K=3 cumulative review fired: PASS for batches 70-72.

See _docs/03_implementation/batch_72_report.md,
_docs/03_implementation/reviews/batch_72_review.md,
_docs/03_implementation/cumulative_review_batches_70-72_cycle1_report.md.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 07:49:17 +03:00

241 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""ArduPilot contract + signing-handshake evaluation for FT-P-09-AP (AZ-416).
Given the captured ``.tlog`` from ``mavproxy-listener`` plus a single
EK3_SRC1_POSXY parameter read, this helper validates:
* AC-1: signing handshake completes within ≤5 s
(``observe_signing_handshake`` — first signed message within the
window OR absence of ``BAD_SIGNATURE`` STATUSTEXT during it).
* AC-2: GPS_INPUT flow at ≥4.5 Hz over the 60 s replay
(``compute_gps_input_rate``).
* AC-3: EK3_SRC1_POSXY == 3 (``validate_ek3_src1_posxy`` — pure check
on the param value the caller fetched via mavproxy).
* AC-4: GPS_RAW_INT health — ``fix_type ≥ 3`` AND ``eph ≤ 200``
(HDOP ≤ 2.0) for ≥80 % of the 60 s window
(``evaluate_gps_raw_int_health``).
All inputs are pure ``Iterable[TlogMessage]``; the tlog ingestion is
delegated to ``runner.helpers.mavproxy_tlog_reader.iter_messages``.
Public-boundary discipline: does NOT import any
``src/gps_denied_onboard`` symbol.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Iterable, Sequence
from .mavproxy_tlog_reader import TlogMessage
HANDSHAKE_BUDGET_S = 5.0
GPS_INPUT_TARGET_RATE_HZ = 5.0
GPS_INPUT_MIN_RATE_HZ = 4.5
GPS_RAW_INT_MIN_FIX_TYPE = 3
GPS_RAW_INT_MAX_EPH = 200 # HDOP × 100 ≤ 200 → HDOP ≤ 2.0
GPS_RAW_INT_HEALTHY_FRACTION_REQUIRED = 0.80
EK3_SRC1_POSXY_REQUIRED = 3 # AP EKF source-set: 3 = GPS
@dataclass(frozen=True)
class HandshakeReport:
"""AC-1: signing-handshake completion observation."""
window_start_us: int
window_end_us: int
first_signed_us: int | None
bad_signature_count: int
setup_signing_seen: bool
@property
def lag_s(self) -> float | None:
if self.first_signed_us is None:
return None
return (self.first_signed_us - self.window_start_us) / 1_000_000.0
@property
def passes(self) -> bool:
return (
self.first_signed_us is not None
and self.lag_s is not None
and self.lag_s <= HANDSHAKE_BUDGET_S
and self.bad_signature_count == 0
)
@dataclass(frozen=True)
class GpsInputRateReport:
"""AC-2: GPS_INPUT rate over the replay window."""
frame_count: int
window_us: int
observed_rate_hz: float
target_rate_hz: float = GPS_INPUT_TARGET_RATE_HZ
min_required_hz: float = GPS_INPUT_MIN_RATE_HZ
@property
def passes(self) -> bool:
return (
self.window_us > 0
and self.observed_rate_hz >= self.min_required_hz
)
@dataclass(frozen=True)
class GpsRawIntHealthReport:
"""AC-4: GPS_RAW_INT fix_type + eph healthy fraction."""
total_samples: int
healthy_samples: int
fraction_required: float = GPS_RAW_INT_HEALTHY_FRACTION_REQUIRED
@property
def healthy_fraction(self) -> float:
if self.total_samples == 0:
return 0.0
return self.healthy_samples / self.total_samples
@property
def passes(self) -> bool:
return (
self.total_samples > 0
and self.healthy_fraction >= self.fraction_required
)
def observe_signing_handshake(
messages: Iterable[TlogMessage],
*,
handshake_window_us: int = int(HANDSHAKE_BUDGET_S * 1_000_000),
) -> HandshakeReport:
"""AC-1: first signed message within ``handshake_window_us``.
The handshake window starts at the FIRST observed message's
timestamp (the SUT cannot be heard from before that). The result
PASSES if a signed message arrives within the window AND no
``STATUSTEXT`` with ``BAD_SIGNATURE`` is observed during it.
The SETUP_SIGNING handshake exchange itself is unsigned by spec
(it's how the key is shared), so its presence is reported but does
NOT gate the pass — the gate is the first SIGNED follow-up.
"""
if handshake_window_us <= 0:
raise ValueError(f"handshake_window_us must be > 0, got {handshake_window_us}")
window_start: int | None = None
window_end: int | None = None
first_signed_us: int | None = None
bad_sig_count = 0
setup_signing_seen = False
for m in messages:
if window_start is None:
window_start = m.timestamp_us
window_end = window_start + handshake_window_us
if window_end is not None and m.timestamp_us > window_end:
break
if m.msg_type == "SETUP_SIGNING":
setup_signing_seen = True
if m.signed and first_signed_us is None:
first_signed_us = m.timestamp_us
if m.msg_type == "STATUSTEXT":
text = str(m.fields.get("text", "")).upper()
if "BAD_SIGNATURE" in text:
bad_sig_count += 1
return HandshakeReport(
window_start_us=window_start or 0,
window_end_us=window_end or 0,
first_signed_us=first_signed_us,
bad_signature_count=bad_sig_count,
setup_signing_seen=setup_signing_seen,
)
def compute_gps_input_rate(
messages: Iterable[TlogMessage],
*,
target_rate_hz: float = GPS_INPUT_TARGET_RATE_HZ,
min_required_hz: float = GPS_INPUT_MIN_RATE_HZ,
) -> GpsInputRateReport:
"""AC-2: GPS_INPUT cadence over the entire message stream."""
if min_required_hz < 0:
raise ValueError(f"min_required_hz must be ≥0, got {min_required_hz}")
timestamps = [m.timestamp_us for m in messages if m.msg_type == "GPS_INPUT"]
if len(timestamps) < 2:
return GpsInputRateReport(
frame_count=len(timestamps),
window_us=0,
observed_rate_hz=0.0,
target_rate_hz=target_rate_hz,
min_required_hz=min_required_hz,
)
window_us = timestamps[-1] - timestamps[0]
if window_us <= 0:
return GpsInputRateReport(
frame_count=len(timestamps),
window_us=window_us,
observed_rate_hz=0.0,
target_rate_hz=target_rate_hz,
min_required_hz=min_required_hz,
)
observed = (len(timestamps) - 1) / (window_us / 1_000_000.0)
return GpsInputRateReport(
frame_count=len(timestamps),
window_us=window_us,
observed_rate_hz=observed,
target_rate_hz=target_rate_hz,
min_required_hz=min_required_hz,
)
def validate_ek3_src1_posxy(value: int) -> bool:
"""AC-3: EK3_SRC1_POSXY must equal 3 (GPS source)."""
return value == EK3_SRC1_POSXY_REQUIRED
def evaluate_gps_raw_int_health(
messages: Iterable[TlogMessage],
*,
min_fix_type: int = GPS_RAW_INT_MIN_FIX_TYPE,
max_eph: int = GPS_RAW_INT_MAX_EPH,
fraction_required: float = GPS_RAW_INT_HEALTHY_FRACTION_REQUIRED,
) -> GpsRawIntHealthReport:
"""AC-4: ≥``fraction_required`` of GPS_RAW_INT samples must be healthy.
A sample is "healthy" iff ``fix_type ≥ min_fix_type`` AND
``eph ≤ max_eph``. Both must hold per the spec text.
"""
if not 0.0 <= fraction_required <= 1.0:
raise ValueError(
f"fraction_required must be in [0, 1], got {fraction_required}"
)
total = 0
healthy = 0
for m in messages:
if m.msg_type != "GPS_RAW_INT":
continue
total += 1
try:
fix_type = int(m.fields["fix_type"]) # type: ignore[arg-type]
eph = int(m.fields["eph"]) # type: ignore[arg-type]
except (KeyError, TypeError, ValueError):
continue
if fix_type >= min_fix_type and eph <= max_eph:
healthy += 1
return GpsRawIntHealthReport(
total_samples=total,
healthy_samples=healthy,
fraction_required=fraction_required,
)
def collect_messages_to_list(messages: Iterable[TlogMessage]) -> list[TlogMessage]:
"""Materialise an iterator into a list — convenience for multi-pass eval.
The scenario reads the tlog once via ``iter_messages`` and runs
multiple analyzers over the result. ``iter_messages`` returns a
generator that closes its underlying pymavlink connection on
exhaustion, so re-iteration is not safe without materialisation.
"""
return list(messages)