mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 16:31:14 +00:00
702a0c0ff3
AZ-408 (3pt) — Replace AZ-406 injector scaffolds with concrete generators: - outlier.py: deterministic stride + far-away tile replacement; AC-2 ≥350m offset - blackout_spoof.py: paired video blackout + FC GPS spoof with ≤40ms alignment; AC-4 realistic fix_type/hdop; AC-NEW-8 200-500m inter-spoof deltas - multi_segment.py: ≥3 disjoint windows, ≥30s gaps, ≤25% coverage - fc_proxy.py: timed-splice runtime proxy with pre-activate RuntimeError guard - _common.py: derive_rng + tile-manifest reader + tmpfs helpers - injector_fixtures.py: pytest fixtures wired via runner conftest AZ-410 (3pt) — FT-P-02 cumulative drift between satellite anchors: - anchor_pair_detector.py: AC-1 detection, AC-2/3 pass-fraction, AC-4 monotonicity check, CSV evidence - test_ft_p_02_derkachi_drift.py: scenario gated on upstream helper NotImplementedError (frame_source_replay / fdr_reader / imu_replay) AZ-411 (2pt) — FT-P-03 + FT-P-14 schema + WGS84: - estimate_schema.py: AC-1 schema completeness, AC-2 source-label set containment, AC-3 WGS84 range + int32 1e-7 decode - test_ft_p_03_14_schema_wgs84.py: shared single-image-push scenario Tests: 248 unit tests pass (+91 vs batch 68). Reports: batch_69_report.md, batch_69_review.md (PASS), cumulative_review_batches_67-69_cycle1_report.md (PASS). Co-authored-by: Cursor <cursoragent@cursor.com>
288 lines
10 KiB
Python
288 lines
10 KiB
Python
"""Anchor-pair detection + drift binning for FT-P-02 (AC-1.3).
|
||
|
||
Consumes a stream of FDR ``source_label`` transitions + position estimates
|
||
and produces:
|
||
|
||
* Anchor pairs: every (visual_propagated | dead_reckoned) → satellite_anchored
|
||
transition is one pair. The pair records the segment's propagated_centre
|
||
immediately before the new anchor, the anchor centre itself, and the
|
||
age of the previous satellite anchor at the moment of the new one.
|
||
|
||
* Drift per pair = geodesic distance (Vincenty / WGS84) between the
|
||
propagated centre and the new anchor centre.
|
||
|
||
* Drift bins by ``last_satellite_anchor_age_ms`` (defaults to the
|
||
spec's {<1 s, 1-3 s, 3-10 s, 10-30 s, >30 s} buckets).
|
||
|
||
* Aggregate pass/fail per AC-1.3:
|
||
- AC-2: ≥95 % of visual-only pairs satisfy drift < 100 m.
|
||
- AC-3: ≥95 % of IMU-fused pairs satisfy drift < 50 m.
|
||
- AC-4: bin medians grow monotonically with age; no >2× jump.
|
||
|
||
The classification (visual-only vs IMU-fused) is purely informational —
|
||
the test code reads it out of the segment's FDR records (any frame with
|
||
``imu_fused=True`` since the prior anchor makes the segment IMU-fused).
|
||
|
||
The helper is **transport-agnostic**: it takes typed FdrEstimate records
|
||
that the per-scenario test produces from the public FDR archive (no SUT
|
||
import). Unit tests construct synthetic streams directly.
|
||
|
||
Public-boundary discipline: this module does NOT import any
|
||
``src/gps_denied_onboard`` symbol.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import statistics
|
||
from dataclasses import dataclass, field
|
||
from typing import Literal, Sequence
|
||
|
||
from .geo import distance_m
|
||
|
||
SourceLabel = Literal["satellite_anchored", "visual_propagated", "dead_reckoned"]
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class FdrEstimate:
|
||
"""One position estimate from the FDR archive (post-flight read).
|
||
|
||
The fields are the public-boundary contract — we never import the
|
||
SUT's ``FdrRecord`` dataclass; we materialise a parallel struct
|
||
from the FDR JSON payload.
|
||
"""
|
||
|
||
monotonic_ms: int
|
||
lat_deg: float
|
||
lon_deg: float
|
||
source_label: SourceLabel
|
||
imu_fused: bool = False
|
||
cov_semi_major_m: float = 0.0
|
||
last_satellite_anchor_age_ms: int = 0
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class AnchorPair:
|
||
"""One (propagated_centre, new_anchor) pair."""
|
||
|
||
segment_first_ms: int
|
||
propagated_centre_ms: int # timestamp of last estimate before anchor
|
||
anchor_ms: int
|
||
propagated_lat_deg: float
|
||
propagated_lon_deg: float
|
||
anchor_lat_deg: float
|
||
anchor_lon_deg: float
|
||
drift_m: float
|
||
last_satellite_anchor_age_ms: int
|
||
imu_fused_segment: bool
|
||
|
||
|
||
# Default bin edges per the spec: {<1 s, 1-3 s, 3-10 s, 10-30 s, >30 s}
|
||
DEFAULT_AGE_BIN_EDGES_MS: tuple[int, ...] = (1_000, 3_000, 10_000, 30_000)
|
||
|
||
|
||
@dataclass
|
||
class DriftBinStats:
|
||
"""Aggregate statistics for one age-bin."""
|
||
|
||
label: str
|
||
count: int = 0
|
||
median_m: float = 0.0
|
||
p95_m: float = 0.0
|
||
drifts_m: list[float] = field(default_factory=list)
|
||
|
||
|
||
@dataclass
|
||
class FtP02Report:
|
||
"""Aggregate report produced by the FT-P-02 scenario."""
|
||
|
||
pairs: list[AnchorPair]
|
||
visual_only_pairs: list[AnchorPair]
|
||
imu_fused_pairs: list[AnchorPair]
|
||
visual_only_pass_fraction: float
|
||
imu_fused_pass_fraction: float
|
||
bin_stats: list[DriftBinStats]
|
||
monotonic_violations: list[str]
|
||
|
||
|
||
def detect_anchor_pairs(stream: Sequence[FdrEstimate]) -> list[AnchorPair]:
|
||
"""Detect every ``visual_propagated|dead_reckoned → satellite_anchored`` transition.
|
||
|
||
Within a single segment (sequence of visual_propagated / dead_reckoned
|
||
estimates), the **propagated_centre** is the estimate immediately
|
||
preceding the next anchor — that's the SUT's last published centre
|
||
before the new anchor pulls it back to ground truth.
|
||
|
||
The "first anchor" of the stream has no predecessor segment and is
|
||
skipped (it is not a pair).
|
||
"""
|
||
pairs: list[AnchorPair] = []
|
||
last_anchor: FdrEstimate | None = None
|
||
current_segment: list[FdrEstimate] = []
|
||
imu_fused_in_segment = False
|
||
|
||
for est in stream:
|
||
if est.source_label == "satellite_anchored":
|
||
if last_anchor is not None and current_segment:
|
||
propagated = current_segment[-1]
|
||
drift = distance_m(
|
||
propagated.lat_deg, propagated.lon_deg,
|
||
est.lat_deg, est.lon_deg,
|
||
)
|
||
pairs.append(
|
||
AnchorPair(
|
||
segment_first_ms=current_segment[0].monotonic_ms,
|
||
propagated_centre_ms=propagated.monotonic_ms,
|
||
anchor_ms=est.monotonic_ms,
|
||
propagated_lat_deg=propagated.lat_deg,
|
||
propagated_lon_deg=propagated.lon_deg,
|
||
anchor_lat_deg=est.lat_deg,
|
||
anchor_lon_deg=est.lon_deg,
|
||
drift_m=drift,
|
||
last_satellite_anchor_age_ms=est.monotonic_ms - last_anchor.monotonic_ms,
|
||
imu_fused_segment=imu_fused_in_segment,
|
||
)
|
||
)
|
||
last_anchor = est
|
||
current_segment = []
|
||
imu_fused_in_segment = False
|
||
else:
|
||
current_segment.append(est)
|
||
if est.imu_fused:
|
||
imu_fused_in_segment = True
|
||
|
||
return pairs
|
||
|
||
|
||
def _bin_label(age_ms: int, edges: tuple[int, ...]) -> str:
|
||
"""Map an age in ms to a human-readable bin label."""
|
||
if age_ms < edges[0]:
|
||
return f"<{edges[0] // 1000}s"
|
||
for i in range(1, len(edges)):
|
||
if age_ms < edges[i]:
|
||
return f"{edges[i - 1] // 1000}-{edges[i] // 1000}s"
|
||
return f">{edges[-1] // 1000}s"
|
||
|
||
|
||
def bin_drifts(
|
||
pairs: Sequence[AnchorPair],
|
||
edges: tuple[int, ...] = DEFAULT_AGE_BIN_EDGES_MS,
|
||
) -> list[DriftBinStats]:
|
||
"""Bin drifts by ``last_satellite_anchor_age_ms``; return per-bin stats."""
|
||
bins: dict[str, list[float]] = {}
|
||
# Pre-create bins in display order so the output is stable.
|
||
labels = [_bin_label(0, edges)]
|
||
labels.extend(f"{edges[i] // 1000}-{edges[i + 1] // 1000}s" for i in range(len(edges) - 1))
|
||
labels.append(f">{edges[-1] // 1000}s")
|
||
for label in labels:
|
||
bins[label] = []
|
||
|
||
for p in pairs:
|
||
bins[_bin_label(p.last_satellite_anchor_age_ms, edges)].append(p.drift_m)
|
||
|
||
stats: list[DriftBinStats] = []
|
||
for label in labels:
|
||
drifts = bins[label]
|
||
if drifts:
|
||
sorted_drifts = sorted(drifts)
|
||
idx95 = max(0, int(round(0.95 * len(sorted_drifts))) - 1)
|
||
stats.append(
|
||
DriftBinStats(
|
||
label=label,
|
||
count=len(drifts),
|
||
median_m=statistics.median(drifts),
|
||
p95_m=sorted_drifts[idx95],
|
||
drifts_m=drifts,
|
||
)
|
||
)
|
||
else:
|
||
stats.append(DriftBinStats(label=label, count=0, median_m=0.0, p95_m=0.0))
|
||
return stats
|
||
|
||
|
||
def check_monotonic(bin_stats: Sequence[DriftBinStats]) -> list[str]:
|
||
"""AC-4: bin medians grow monotonically with age; no >2× jump between
|
||
adjacent populated bins. Returns a list of violation strings (empty
|
||
iff the AC holds).
|
||
"""
|
||
violations: list[str] = []
|
||
populated = [s for s in bin_stats if s.count > 0]
|
||
for prev, nxt in zip(populated, populated[1:]):
|
||
if nxt.median_m < prev.median_m:
|
||
violations.append(
|
||
f"non-monotonic median: bin {prev.label} median {prev.median_m:.2f} m > "
|
||
f"bin {nxt.label} median {nxt.median_m:.2f} m"
|
||
)
|
||
elif prev.median_m > 0 and nxt.median_m > 2 * prev.median_m:
|
||
violations.append(
|
||
f">2x median jump: bin {prev.label} median {prev.median_m:.2f} m → "
|
||
f"bin {nxt.label} median {nxt.median_m:.2f} m"
|
||
)
|
||
return violations
|
||
|
||
|
||
def compute_pass_fraction(pairs: Sequence[AnchorPair], drift_bound_m: float) -> float:
|
||
"""Fraction of pairs whose drift < ``drift_bound_m``. Returns 0.0 for empty."""
|
||
if not pairs:
|
||
return 0.0
|
||
pass_count = sum(1 for p in pairs if p.drift_m < drift_bound_m)
|
||
return pass_count / len(pairs)
|
||
|
||
|
||
def aggregate(
|
||
stream: Sequence[FdrEstimate],
|
||
visual_only_bound_m: float = 100.0,
|
||
imu_fused_bound_m: float = 50.0,
|
||
edges: tuple[int, ...] = DEFAULT_AGE_BIN_EDGES_MS,
|
||
) -> FtP02Report:
|
||
"""End-to-end aggregation: stream → pairs → bins → pass fractions → monotonicity."""
|
||
pairs = detect_anchor_pairs(stream)
|
||
visual_only = [p for p in pairs if not p.imu_fused_segment]
|
||
imu_fused = [p for p in pairs if p.imu_fused_segment]
|
||
bin_stats = bin_drifts(pairs, edges)
|
||
return FtP02Report(
|
||
pairs=pairs,
|
||
visual_only_pairs=visual_only,
|
||
imu_fused_pairs=imu_fused,
|
||
visual_only_pass_fraction=compute_pass_fraction(visual_only, visual_only_bound_m),
|
||
imu_fused_pass_fraction=compute_pass_fraction(imu_fused, imu_fused_bound_m),
|
||
bin_stats=bin_stats,
|
||
monotonic_violations=check_monotonic(bin_stats),
|
||
)
|
||
|
||
|
||
def write_csv_evidence(report: FtP02Report, csv_path) -> None: # type: ignore[no-untyped-def]
|
||
"""Emit one CSV row per anchor pair under ``csv_path`` (FT-P-02 evidence)."""
|
||
import csv as _csv
|
||
|
||
with csv_path.open("w", newline="") as fp:
|
||
writer = _csv.writer(fp, lineterminator="\n")
|
||
writer.writerow(
|
||
[
|
||
"segment_first_ms",
|
||
"propagated_centre_ms",
|
||
"anchor_ms",
|
||
"propagated_lat_deg",
|
||
"propagated_lon_deg",
|
||
"anchor_lat_deg",
|
||
"anchor_lon_deg",
|
||
"drift_m",
|
||
"last_satellite_anchor_age_ms",
|
||
"imu_fused_segment",
|
||
]
|
||
)
|
||
for p in report.pairs:
|
||
writer.writerow(
|
||
[
|
||
p.segment_first_ms,
|
||
p.propagated_centre_ms,
|
||
p.anchor_ms,
|
||
f"{p.propagated_lat_deg:.7f}",
|
||
f"{p.propagated_lon_deg:.7f}",
|
||
f"{p.anchor_lat_deg:.7f}",
|
||
f"{p.anchor_lon_deg:.7f}",
|
||
f"{p.drift_m:.3f}",
|
||
p.last_satellite_anchor_age_ms,
|
||
int(p.imu_fused_segment),
|
||
]
|
||
)
|