"""Anchor-pair detection + drift binning for FT-P-02 (AC-1.3). Consumes a stream of FDR ``source_label`` transitions + position estimates and produces: * Anchor pairs: every (visual_propagated | dead_reckoned) → satellite_anchored transition is one pair. The pair records the segment's propagated_centre immediately before the new anchor, the anchor centre itself, and the age of the previous satellite anchor at the moment of the new one. * Drift per pair = geodesic distance (Vincenty / WGS84) between the propagated centre and the new anchor centre. * Drift bins by ``last_satellite_anchor_age_ms`` (defaults to the spec's {<1 s, 1-3 s, 3-10 s, 10-30 s, >30 s} buckets). * Aggregate pass/fail per AC-1.3: - AC-2: ≥95 % of visual-only pairs satisfy drift < 100 m. - AC-3: ≥95 % of IMU-fused pairs satisfy drift < 50 m. - AC-4: bin medians grow monotonically with age; no >2× jump. The classification (visual-only vs IMU-fused) is purely informational — the test code reads it out of the segment's FDR records (any frame with ``imu_fused=True`` since the prior anchor makes the segment IMU-fused). The helper is **transport-agnostic**: it takes typed FdrEstimate records that the per-scenario test produces from the public FDR archive (no SUT import). Unit tests construct synthetic streams directly. Public-boundary discipline: this module does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations import statistics from dataclasses import dataclass, field from typing import Literal, Sequence from .geo import distance_m SourceLabel = Literal["satellite_anchored", "visual_propagated", "dead_reckoned"] @dataclass(frozen=True) class FdrEstimate: """One position estimate from the FDR archive (post-flight read). The fields are the public-boundary contract — we never import the SUT's ``FdrRecord`` dataclass; we materialise a parallel struct from the FDR JSON payload. """ monotonic_ms: int lat_deg: float lon_deg: float source_label: SourceLabel imu_fused: bool = False cov_semi_major_m: float = 0.0 last_satellite_anchor_age_ms: int = 0 @dataclass(frozen=True) class AnchorPair: """One (propagated_centre, new_anchor) pair.""" segment_first_ms: int propagated_centre_ms: int # timestamp of last estimate before anchor anchor_ms: int propagated_lat_deg: float propagated_lon_deg: float anchor_lat_deg: float anchor_lon_deg: float drift_m: float last_satellite_anchor_age_ms: int imu_fused_segment: bool # Default bin edges per the spec: {<1 s, 1-3 s, 3-10 s, 10-30 s, >30 s} DEFAULT_AGE_BIN_EDGES_MS: tuple[int, ...] = (1_000, 3_000, 10_000, 30_000) @dataclass class DriftBinStats: """Aggregate statistics for one age-bin.""" label: str count: int = 0 median_m: float = 0.0 p95_m: float = 0.0 drifts_m: list[float] = field(default_factory=list) @dataclass class FtP02Report: """Aggregate report produced by the FT-P-02 scenario.""" pairs: list[AnchorPair] visual_only_pairs: list[AnchorPair] imu_fused_pairs: list[AnchorPair] visual_only_pass_fraction: float imu_fused_pass_fraction: float bin_stats: list[DriftBinStats] monotonic_violations: list[str] def detect_anchor_pairs(stream: Sequence[FdrEstimate]) -> list[AnchorPair]: """Detect every ``visual_propagated|dead_reckoned → satellite_anchored`` transition. Within a single segment (sequence of visual_propagated / dead_reckoned estimates), the **propagated_centre** is the estimate immediately preceding the next anchor — that's the SUT's last published centre before the new anchor pulls it back to ground truth. The "first anchor" of the stream has no predecessor segment and is skipped (it is not a pair). """ pairs: list[AnchorPair] = [] last_anchor: FdrEstimate | None = None current_segment: list[FdrEstimate] = [] imu_fused_in_segment = False for est in stream: if est.source_label == "satellite_anchored": if last_anchor is not None and current_segment: propagated = current_segment[-1] drift = distance_m( propagated.lat_deg, propagated.lon_deg, est.lat_deg, est.lon_deg, ) pairs.append( AnchorPair( segment_first_ms=current_segment[0].monotonic_ms, propagated_centre_ms=propagated.monotonic_ms, anchor_ms=est.monotonic_ms, propagated_lat_deg=propagated.lat_deg, propagated_lon_deg=propagated.lon_deg, anchor_lat_deg=est.lat_deg, anchor_lon_deg=est.lon_deg, drift_m=drift, last_satellite_anchor_age_ms=est.monotonic_ms - last_anchor.monotonic_ms, imu_fused_segment=imu_fused_in_segment, ) ) last_anchor = est current_segment = [] imu_fused_in_segment = False else: current_segment.append(est) if est.imu_fused: imu_fused_in_segment = True return pairs def _bin_label(age_ms: int, edges: tuple[int, ...]) -> str: """Map an age in ms to a human-readable bin label.""" if age_ms < edges[0]: return f"<{edges[0] // 1000}s" for i in range(1, len(edges)): if age_ms < edges[i]: return f"{edges[i - 1] // 1000}-{edges[i] // 1000}s" return f">{edges[-1] // 1000}s" def bin_drifts( pairs: Sequence[AnchorPair], edges: tuple[int, ...] = DEFAULT_AGE_BIN_EDGES_MS, ) -> list[DriftBinStats]: """Bin drifts by ``last_satellite_anchor_age_ms``; return per-bin stats.""" bins: dict[str, list[float]] = {} # Pre-create bins in display order so the output is stable. labels = [_bin_label(0, edges)] labels.extend(f"{edges[i] // 1000}-{edges[i + 1] // 1000}s" for i in range(len(edges) - 1)) labels.append(f">{edges[-1] // 1000}s") for label in labels: bins[label] = [] for p in pairs: bins[_bin_label(p.last_satellite_anchor_age_ms, edges)].append(p.drift_m) stats: list[DriftBinStats] = [] for label in labels: drifts = bins[label] if drifts: sorted_drifts = sorted(drifts) idx95 = max(0, int(round(0.95 * len(sorted_drifts))) - 1) stats.append( DriftBinStats( label=label, count=len(drifts), median_m=statistics.median(drifts), p95_m=sorted_drifts[idx95], drifts_m=drifts, ) ) else: stats.append(DriftBinStats(label=label, count=0, median_m=0.0, p95_m=0.0)) return stats def check_monotonic(bin_stats: Sequence[DriftBinStats]) -> list[str]: """AC-4: bin medians grow monotonically with age; no >2× jump between adjacent populated bins. Returns a list of violation strings (empty iff the AC holds). """ violations: list[str] = [] populated = [s for s in bin_stats if s.count > 0] for prev, nxt in zip(populated, populated[1:]): if nxt.median_m < prev.median_m: violations.append( f"non-monotonic median: bin {prev.label} median {prev.median_m:.2f} m > " f"bin {nxt.label} median {nxt.median_m:.2f} m" ) elif prev.median_m > 0 and nxt.median_m > 2 * prev.median_m: violations.append( f">2x median jump: bin {prev.label} median {prev.median_m:.2f} m → " f"bin {nxt.label} median {nxt.median_m:.2f} m" ) return violations def compute_pass_fraction(pairs: Sequence[AnchorPair], drift_bound_m: float) -> float: """Fraction of pairs whose drift < ``drift_bound_m``. Returns 0.0 for empty.""" if not pairs: return 0.0 pass_count = sum(1 for p in pairs if p.drift_m < drift_bound_m) return pass_count / len(pairs) def aggregate( stream: Sequence[FdrEstimate], visual_only_bound_m: float = 100.0, imu_fused_bound_m: float = 50.0, edges: tuple[int, ...] = DEFAULT_AGE_BIN_EDGES_MS, ) -> FtP02Report: """End-to-end aggregation: stream → pairs → bins → pass fractions → monotonicity.""" pairs = detect_anchor_pairs(stream) visual_only = [p for p in pairs if not p.imu_fused_segment] imu_fused = [p for p in pairs if p.imu_fused_segment] bin_stats = bin_drifts(pairs, edges) return FtP02Report( pairs=pairs, visual_only_pairs=visual_only, imu_fused_pairs=imu_fused, visual_only_pass_fraction=compute_pass_fraction(visual_only, visual_only_bound_m), imu_fused_pass_fraction=compute_pass_fraction(imu_fused, imu_fused_bound_m), bin_stats=bin_stats, monotonic_violations=check_monotonic(bin_stats), ) def write_csv_evidence(report: FtP02Report, csv_path) -> None: # type: ignore[no-untyped-def] """Emit one CSV row per anchor pair under ``csv_path`` (FT-P-02 evidence).""" import csv as _csv with csv_path.open("w", newline="") as fp: writer = _csv.writer(fp, lineterminator="\n") writer.writerow( [ "segment_first_ms", "propagated_centre_ms", "anchor_ms", "propagated_lat_deg", "propagated_lon_deg", "anchor_lat_deg", "anchor_lon_deg", "drift_m", "last_satellite_anchor_age_ms", "imu_fused_segment", ] ) for p in report.pairs: writer.writerow( [ p.segment_first_ms, p.propagated_centre_ms, p.anchor_ms, f"{p.propagated_lat_deg:.7f}", f"{p.propagated_lon_deg:.7f}", f"{p.anchor_lat_deg:.7f}", f"{p.anchor_lon_deg:.7f}", f"{p.drift_m:.3f}", p.last_satellite_anchor_age_ms, int(p.imu_fused_segment), ] )