"""GCS telemetry evaluation for FT-P-12 + FT-P-13 (AZ-420 / AC-6.1, AC-6.2). Two evaluators sourced from the GCS-side ``.tlog`` captured by ``mavproxy-listener`` plus the FDR archive: * **FT-P-12 / AC-6.1**: SUT→GCS summary cadence must land in [1, 2] Hz over the 60 s replay window. The SUT's C8 ``QgcTelemetryAdapter`` pairs ``GLOBAL_POSITION_INT`` + ``NAMED_VALUE_FLOAT`` at the configured ``summary_rate_hz``; we count ``GLOBAL_POSITION_INT`` bursts since the ``NAMED_VALUE_FLOAT`` companion is decorative. * **FT-P-13 / AC-6.2**: GCS-originated ``STATUSTEXT`` carrying an operator re-loc hint: * acknowledgement latency from inject → FDR ``c8.gcs.operator_command`` record must be ≤ 2 s (AC-2); * the next per-frame ``anchor_search_region`` FDR record's centre must move closer to the hinted location than the last pre-hint region (AC-3); * no ``BAD_SIGNATURE`` / ``UNAUTHORIZED`` STATUSTEXT may appear in the rejection window after the hint (AC-4). All inputs are pure iterables / sequences. The tlog ingestion is delegated to ``runner.helpers.mavproxy_tlog_reader.iter_messages`` and the FDR ingestion to ``runner.helpers.fdr_reader.iter_records``. Public-boundary discipline: this module does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations import math from dataclasses import dataclass from typing import Iterable, Sequence from .mavproxy_tlog_reader import TlogMessage GCS_SUMMARY_RATE_MIN_HZ = 1.0 GCS_SUMMARY_RATE_MAX_HZ = 2.0 GCS_SUMMARY_POSITION_MSG_TYPE = "GLOBAL_POSITION_INT" GCS_SUMMARY_COMPANION_MSG_TYPE = "NAMED_VALUE_FLOAT" HINT_ACK_MAX_LATENCY_MS = 2000.0 HINT_FDR_KIND = "c8.gcs.operator_command" HINT_REJECTION_STATUSTEXT_TOKENS = ("BAD_SIGNATURE", "UNAUTHORIZED", "REJECTED") ANCHOR_SEARCH_REGION_FDR_KIND = "anchor_search_region" _EARTH_RADIUS_M = 6_371_008.8 # ─────────────────────── FT-P-12 / AC-6.1 ─────────────────────── @dataclass(frozen=True) class GcsSummaryRateReport: """AC-6.1: SUT→GCS summary cadence over the replay window.""" total_summary_messages: int window_us: int observed_rate_hz: float min_required_hz: float = GCS_SUMMARY_RATE_MIN_HZ max_required_hz: float = GCS_SUMMARY_RATE_MAX_HZ @property def passes(self) -> bool: if self.window_us <= 0: return False return self.min_required_hz <= self.observed_rate_hz <= self.max_required_hz def compute_gcs_summary_rate( messages: Iterable[TlogMessage], *, position_msg_type: str = GCS_SUMMARY_POSITION_MSG_TYPE, min_required_hz: float = GCS_SUMMARY_RATE_MIN_HZ, max_required_hz: float = GCS_SUMMARY_RATE_MAX_HZ, ) -> GcsSummaryRateReport: """AC-6.1: rate of ``GLOBAL_POSITION_INT`` messages emitted to the GCS. Each SUT→GCS summary "burst" is one ``GLOBAL_POSITION_INT`` paired with one ``NAMED_VALUE_FLOAT(horiz_m)`` per the C8 ``QgcTelemetryAdapter`` implementation; only the position message is counted to avoid double-counting the decorative companion. Rate is computed over the (first, last) timestamp span — i.e., ``(N-1) / window_seconds`` — to match ``compute_gps_input_rate`` in ``ap_contract_evaluator``. """ if min_required_hz < 0: raise ValueError(f"min_required_hz must be ≥0, got {min_required_hz}") if max_required_hz < min_required_hz: raise ValueError( f"max_required_hz ({max_required_hz}) must be ≥ " f"min_required_hz ({min_required_hz})" ) timestamps = [m.timestamp_us for m in messages if m.msg_type == position_msg_type] if len(timestamps) < 2: return GcsSummaryRateReport( total_summary_messages=len(timestamps), window_us=0, observed_rate_hz=0.0, min_required_hz=min_required_hz, max_required_hz=max_required_hz, ) window_us = timestamps[-1] - timestamps[0] if window_us <= 0: return GcsSummaryRateReport( total_summary_messages=len(timestamps), window_us=window_us, observed_rate_hz=0.0, min_required_hz=min_required_hz, max_required_hz=max_required_hz, ) observed_hz = (len(timestamps) - 1) / (window_us / 1_000_000.0) return GcsSummaryRateReport( total_summary_messages=len(timestamps), window_us=window_us, observed_rate_hz=observed_hz, min_required_hz=min_required_hz, max_required_hz=max_required_hz, ) # ─────────────────────── FT-P-13 / AC-6.2 ─────────────────────── @dataclass(frozen=True) class InboundHint: """A GCS-originated re-loc hint observed inbound on the SUT side. Sourced from a ``STATUSTEXT`` MAVLink message captured in the GCS tlog. ``hint_text`` is the raw payload (the operator's hint string). """ inject_timestamp_us: int hint_text: str @dataclass(frozen=True) class FdrCommandAck: """An FDR record acknowledging the inbound operator command. Sourced from ``kind='log'`` records whose payload ``kv.kind`` equals ``c8.gcs.operator_command`` (the kind the QGC adapter emits when it translates an inbound command into an ``OperatorCommand`` DTO). """ ack_timestamp_us: int payload_kv: dict def correlate_hint_acks( hints: Sequence[InboundHint], acks: Sequence[FdrCommandAck], ) -> "HintAckReport": """AC-6.2 / AC-2: pair each hint with its earliest succeeding ack. Pairing is greedy in injection order. A given FDR ack can match at most one hint; an ack whose timestamp precedes every hint is ignored (it cannot be an ack for those hints). """ sorted_acks = sorted(acks, key=lambda a: a.ack_timestamp_us) cursor = 0 pairs: list[tuple[InboundHint, FdrCommandAck | None]] = [] for hint in hints: match: FdrCommandAck | None = None while cursor < len(sorted_acks): ack = sorted_acks[cursor] if ack.ack_timestamp_us < hint.inject_timestamp_us: cursor += 1 continue match = ack cursor += 1 break pairs.append((hint, match)) latencies: list[float | None] = [] for hint, ack in pairs: if ack is None: latencies.append(None) else: latencies.append((ack.ack_timestamp_us - hint.inject_timestamp_us) / 1000.0) return HintAckReport( hints=tuple(hints), acks=tuple(sorted_acks), latencies_ms=tuple(latencies), ) @dataclass(frozen=True) class HintAckReport: """AC-2 of FT-P-13: per-hint inject→ack latency.""" hints: tuple[InboundHint, ...] acks: tuple[FdrCommandAck, ...] latencies_ms: tuple[float | None, ...] max_required_ms: float = HINT_ACK_MAX_LATENCY_MS @property def acked_count(self) -> int: return sum(1 for latency in self.latencies_ms if latency is not None) @property def passes(self) -> bool: if not self.hints: return False return all( latency is not None and latency <= self.max_required_ms for latency in self.latencies_ms ) @dataclass(frozen=True) class SearchRegionRecord: """One ``anchor_search_region`` FDR record. Schema (AC-NEW-3 family): per-frame record of the satellite-anchor search region the C2 backbone is currently scanning. Centre is in WGS84 degrees; radius is in metres. """ monotonic_us: int centre_lat_deg: float centre_lon_deg: float radius_m: float def haversine_distance_m( lat_a_deg: float, lon_a_deg: float, lat_b_deg: float, lon_b_deg: float ) -> float: """Great-circle distance between two WGS84 points in metres. Uses the spherical haversine formula with the mean Earth radius. Accurate to ≪1 m for the sub-100 km separations FT-P-13 cares about. """ phi_a = math.radians(lat_a_deg) phi_b = math.radians(lat_b_deg) dphi = math.radians(lat_b_deg - lat_a_deg) dlam = math.radians(lon_b_deg - lon_a_deg) a = math.sin(dphi / 2) ** 2 + math.cos(phi_a) * math.cos(phi_b) * math.sin(dlam / 2) ** 2 c = 2 * math.asin(min(1.0, math.sqrt(a))) return _EARTH_RADIUS_M * c @dataclass(frozen=True) class SearchRegionShiftReport: """AC-3 of FT-P-13: did the search region shift toward the hint?""" hint_lat_deg: float hint_lon_deg: float region_before: SearchRegionRecord | None region_after: SearchRegionRecord | None distance_before_m: float | None distance_after_m: float | None @property def passes(self) -> bool: if self.region_after is None or self.distance_after_m is None: return False if self.region_before is None or self.distance_before_m is None: return True return self.distance_after_m < self.distance_before_m def evaluate_search_region_shift( regions: Sequence[SearchRegionRecord], hint_inject_timestamp_us: int, hint_lat_deg: float, hint_lon_deg: float, ) -> SearchRegionShiftReport: """AC-3: compare the last pre-hint region to the first post-hint region. The "shift toward the hint" signal is positive iff the first region observed AFTER ``hint_inject_timestamp_us`` is closer to ``(hint_lat_deg, hint_lon_deg)`` than the last region observed BEFORE the inject. If no pre-hint region exists, any post-hint region counts as a pass (the bias was set before the C2 backbone had a chance to publish anything). """ region_before: SearchRegionRecord | None = None region_after: SearchRegionRecord | None = None for region in regions: if region.monotonic_us < hint_inject_timestamp_us: region_before = region # keep moving forward to find the last pre-hint elif region_after is None: region_after = region distance_before = ( haversine_distance_m( region_before.centre_lat_deg, region_before.centre_lon_deg, hint_lat_deg, hint_lon_deg, ) if region_before is not None else None ) distance_after = ( haversine_distance_m( region_after.centre_lat_deg, region_after.centre_lon_deg, hint_lat_deg, hint_lon_deg, ) if region_after is not None else None ) return SearchRegionShiftReport( hint_lat_deg=hint_lat_deg, hint_lon_deg=hint_lon_deg, region_before=region_before, region_after=region_after, distance_before_m=distance_before, distance_after_m=distance_after, ) @dataclass(frozen=True) class HintRejectionReport: """AC-4 of FT-P-13: no security/auth rejection of the well-formed hint.""" inject_timestamp_us: int window_us: int rejection_count: int rejection_texts: tuple[str, ...] @property def passes(self) -> bool: return self.rejection_count == 0 def detect_hint_rejection( messages: Iterable[TlogMessage], inject_timestamp_us: int, *, window_us: int = int(HINT_ACK_MAX_LATENCY_MS * 1000.0), rejection_tokens: Sequence[str] = HINT_REJECTION_STATUSTEXT_TOKENS, ) -> HintRejectionReport: """AC-4: scan ``STATUSTEXT`` in the post-inject window for rejection markers. A rejection is any ``STATUSTEXT`` whose payload ``text`` field (case insensitive) contains any of ``rejection_tokens``. The window opens at the inject timestamp and closes ``window_us`` later — beyond that a rejection cannot be causally tied to this hint. """ if window_us <= 0: raise ValueError(f"window_us must be > 0, got {window_us}") window_end = inject_timestamp_us + window_us tokens_upper = tuple(token.upper() for token in rejection_tokens) rejection_texts: list[str] = [] for msg in messages: if msg.msg_type != "STATUSTEXT": continue if not (inject_timestamp_us <= msg.timestamp_us <= window_end): continue text = str(msg.fields.get("text", "")).upper() if any(token in text for token in tokens_upper): rejection_texts.append(str(msg.fields.get("text", ""))) return HintRejectionReport( inject_timestamp_us=inject_timestamp_us, window_us=window_us, rejection_count=len(rejection_texts), rejection_texts=tuple(rejection_texts), ) # ─────────────────────── tlog→hint adapter ─────────────────────── def extract_inbound_hints( messages: Iterable[TlogMessage], *, hint_prefix: str = "RELOC:", ) -> list[InboundHint]: """Extract operator-injected reloc-hint STATUSTEXTs from the tlog. The test fixture builder injects ``STATUSTEXT`` messages whose payload ``text`` begins with ``hint_prefix`` (default ``"RELOC:"``) followed by a comma-separated payload (e.g. ``"RELOC:50.0,36.0,200"`` encoding lat,lon,radius_m). The exact payload shape is not interpreted here — that belongs to the scenario test. We only identify which STATUSTEXTs are hints so the FDR correlator knows when the operator pressed "send". """ out: list[InboundHint] = [] for msg in messages: if msg.msg_type != "STATUSTEXT": continue text = str(msg.fields.get("text", "")) if not text.startswith(hint_prefix): continue out.append(InboundHint(inject_timestamp_us=msg.timestamp_us, hint_text=text)) return out def parse_reloc_payload(hint_text: str, *, hint_prefix: str = "RELOC:") -> tuple[float, float, float]: """Parse ``RELOC:,,`` into ``(lat, lon, radius)``. Raises ``ValueError`` on malformed payload — scenarios should let that surface so the run fails loudly rather than silently scoring AC-3 against garbage coordinates. """ if not hint_text.startswith(hint_prefix): raise ValueError( f"hint text does not start with {hint_prefix!r}: {hint_text!r}" ) body = hint_text[len(hint_prefix):] parts = body.split(",") if len(parts) != 3: raise ValueError( f"hint payload must have 3 comma-separated fields " f"(lat,lon,radius_m); got {len(parts)}: {body!r}" ) try: lat = float(parts[0]) lon = float(parts[1]) radius_m = float(parts[2]) except ValueError as exc: raise ValueError(f"hint payload fields must be floats: {body!r}") from exc return (lat, lon, radius_m) def collect_messages_to_list(messages: Iterable[TlogMessage]) -> list[TlogMessage]: """Materialise an iterator into a list — convenience for multi-pass eval. Mirrors ``ap_contract_evaluator.collect_messages_to_list``: scenarios parse the tlog once via ``iter_messages`` and run multiple analyzers over the result. """ return list(messages)