mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 12:41:13 +00:00
c6e6cba237
- AZ-414 (FT-P-07 + FT-N-02): sharp_turn_detector helper covering AC-1 (gyro_z run detection + synthetic-overlay fallback), AC-2/AC-3 (FT-N-02 during-turn label + monotonic covariance), AC-4/AC-5/AC-6 (FT-P-07 recovery lag/drift/heading); twin scenario files under positive/ and negative/. - AZ-415 (FT-P-08): multi_segment_evaluator helper + scenario. - AZ-418 (FT-P-10): smoothing_evaluator helper covering AC-1 (raw + smoothed pose pairing), AC-2 (improvement rate >= 0.80), AC-3 (mean improvement >= 5 m); scenario file. - All scenarios skip-gated on upstream frame_source_replay / imu_replay / fdr_reader stubs (auto-activate when AZ-441 + AZ-407 leftovers land). - +68 unit tests; full e2e unit suite: 393 passed. See _docs/03_implementation/batch_71_report.md and _docs/03_implementation/reviews/batch_71_review.md. Co-authored-by: Cursor <cursoragent@cursor.com>
247 lines
7.7 KiB
Python
247 lines
7.7 KiB
Python
"""GTSAM smoothing-loop look-back evaluation for FT-P-10 (AZ-418 / AC-4.5 revised).
|
||
|
||
The SUT exposes (per AC-NEW-3 FDR schema) two record entries per past
|
||
keyframe ``k``:
|
||
|
||
* ``raw_pose_k``: the single-shot pose at the keyframe's first emission.
|
||
* ``smoothed_pose_k``: the iSAM2-converged pose at smoother window exit.
|
||
|
||
This helper pairs them by keyframe id, computes per-keyframe geodesic
|
||
distance vs the Derkachi ``GLOBAL_POSITION_INT`` GT, and reports:
|
||
|
||
* AC-2: ``count(smoothed_error < raw_error) / total_keyframes ≥ 0.80``.
|
||
* AC-3: ``mean(raw_error − smoothed_error) ≥ 5 m`` across all keyframes.
|
||
|
||
Mode B Fact #107 is reflected in the spec: this is an INTERNAL
|
||
improvement metric — NOT FC-side retroactive correction.
|
||
|
||
Public-boundary discipline: this module does NOT import any
|
||
``src/gps_denied_onboard`` symbol.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import csv
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from statistics import mean
|
||
from typing import Iterable, Mapping, Sequence
|
||
|
||
from .geo import distance_m
|
||
|
||
IMPROVEMENT_RATE_REQUIRED = 0.80
|
||
MEAN_IMPROVEMENT_M_REQUIRED = 5.0
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class GtPose:
|
||
"""One Derkachi GLOBAL_POSITION_INT GT pose."""
|
||
|
||
monotonic_ms: int
|
||
lat_deg: float
|
||
lon_deg: float
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class KeyframePoseRecord:
|
||
"""One FDR pose record for a past keyframe.
|
||
|
||
The scenario builds these from the FDR archive, separating raw vs
|
||
smoothed by the FDR ``payload['pose_kind']`` field.
|
||
"""
|
||
|
||
keyframe_id: int
|
||
pose_kind: str # "raw" | "smoothed"
|
||
monotonic_ms: int
|
||
lat_deg: float
|
||
lon_deg: float
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class KeyframePair:
|
||
"""A matched raw + smoothed pair for one keyframe."""
|
||
|
||
keyframe_id: int
|
||
raw: KeyframePoseRecord
|
||
smoothed: KeyframePoseRecord
|
||
gt: GtPose
|
||
raw_error_m: float
|
||
smoothed_error_m: float
|
||
|
||
@property
|
||
def improvement_m(self) -> float:
|
||
return self.raw_error_m - self.smoothed_error_m
|
||
|
||
@property
|
||
def smoothed_wins(self) -> bool:
|
||
return self.smoothed_error_m < self.raw_error_m
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class SmoothingReport:
|
||
"""Aggregate report consumed by the scenario assertion."""
|
||
|
||
pairs: tuple[KeyframePair, ...]
|
||
improvement_rate: float
|
||
mean_improvement_m: float
|
||
median_improvement_m: float
|
||
rate_required: float = IMPROVEMENT_RATE_REQUIRED
|
||
mean_required: float = MEAN_IMPROVEMENT_M_REQUIRED
|
||
|
||
@property
|
||
def pair_count(self) -> int:
|
||
return len(self.pairs)
|
||
|
||
@property
|
||
def passes_rate(self) -> bool:
|
||
return self.pair_count > 0 and self.improvement_rate >= self.rate_required
|
||
|
||
@property
|
||
def passes_mean(self) -> bool:
|
||
return self.pair_count > 0 and self.mean_improvement_m >= self.mean_required
|
||
|
||
@property
|
||
def passes(self) -> bool:
|
||
return self.passes_rate and self.passes_mean
|
||
|
||
|
||
def pair_records(
|
||
records: Sequence[KeyframePoseRecord],
|
||
) -> dict[int, tuple[KeyframePoseRecord | None, KeyframePoseRecord | None]]:
|
||
"""Group records by ``keyframe_id``; return (raw, smoothed) pairs.
|
||
|
||
Duplicate ``pose_kind`` values for the same keyframe raise
|
||
``ValueError`` — the FDR schema MUST emit exactly one raw + one
|
||
smoothed per past keyframe.
|
||
"""
|
||
by_kf: dict[int, dict[str, KeyframePoseRecord]] = {}
|
||
for r in records:
|
||
if r.pose_kind not in ("raw", "smoothed"):
|
||
raise ValueError(
|
||
f"unknown pose_kind '{r.pose_kind}' for keyframe {r.keyframe_id}"
|
||
)
|
||
bucket = by_kf.setdefault(r.keyframe_id, {})
|
||
if r.pose_kind in bucket:
|
||
raise ValueError(
|
||
f"duplicate {r.pose_kind} pose for keyframe {r.keyframe_id}"
|
||
)
|
||
bucket[r.pose_kind] = r
|
||
return {
|
||
kf: (b.get("raw"), b.get("smoothed"))
|
||
for kf, b in by_kf.items()
|
||
}
|
||
|
||
|
||
def resolve_gt_at(monotonic_ms: int, gt_track: Sequence[GtPose]) -> GtPose:
|
||
"""Find the GT pose nearest to ``monotonic_ms`` (linear scan).
|
||
|
||
The Derkachi GT track is at 10 Hz (100 ms cadence); choosing the
|
||
nearest sample is < 50 ms off and acceptable for the AC-4.5
|
||
measurement which is about decimetre-to-metre improvement deltas.
|
||
"""
|
||
if not gt_track:
|
||
raise ValueError("gt_track is empty")
|
||
best = min(gt_track, key=lambda g: abs(g.monotonic_ms - monotonic_ms))
|
||
return best
|
||
|
||
|
||
def evaluate(
|
||
records: Sequence[KeyframePoseRecord],
|
||
gt_track: Sequence[GtPose],
|
||
) -> SmoothingReport:
|
||
"""Pair raw + smoothed by keyframe, compute errors, aggregate.
|
||
|
||
Keyframes missing either raw or smoothed are excluded from the
|
||
aggregate (with a docstring note); a future tightening could raise
|
||
on missing entries, but the spec text "if only one is present, the
|
||
test fails" applies at the scenario level — we surface it here via
|
||
the empty-pair count.
|
||
"""
|
||
if not gt_track:
|
||
raise ValueError("gt_track must not be empty")
|
||
paired = pair_records(records)
|
||
pairs: list[KeyframePair] = []
|
||
for kf, (raw, smoothed) in sorted(paired.items()):
|
||
if raw is None or smoothed is None:
|
||
continue
|
||
# Use the raw record's monotonic_ms to resolve GT — the keyframe's
|
||
# actual flight time, not the smoothing-window-exit time.
|
||
gt = resolve_gt_at(raw.monotonic_ms, gt_track)
|
||
raw_err = distance_m(gt.lat_deg, gt.lon_deg, raw.lat_deg, raw.lon_deg)
|
||
smoothed_err = distance_m(gt.lat_deg, gt.lon_deg, smoothed.lat_deg, smoothed.lon_deg)
|
||
pairs.append(
|
||
KeyframePair(
|
||
keyframe_id=kf,
|
||
raw=raw,
|
||
smoothed=smoothed,
|
||
gt=gt,
|
||
raw_error_m=raw_err,
|
||
smoothed_error_m=smoothed_err,
|
||
)
|
||
)
|
||
if pairs:
|
||
wins = sum(1 for p in pairs if p.smoothed_wins)
|
||
rate = wins / len(pairs)
|
||
improvements = [p.improvement_m for p in pairs]
|
||
mean_imp = mean(improvements)
|
||
sorted_imps = sorted(improvements)
|
||
n = len(sorted_imps)
|
||
median_imp = (
|
||
sorted_imps[n // 2] if n % 2 == 1
|
||
else (sorted_imps[n // 2 - 1] + sorted_imps[n // 2]) / 2.0
|
||
)
|
||
else:
|
||
rate = 0.0
|
||
mean_imp = 0.0
|
||
median_imp = 0.0
|
||
return SmoothingReport(
|
||
pairs=tuple(pairs),
|
||
improvement_rate=rate,
|
||
mean_improvement_m=mean_imp,
|
||
median_improvement_m=median_imp,
|
||
)
|
||
|
||
|
||
def write_csv_evidence(out_path: Path, report: SmoothingReport) -> Path:
|
||
"""Write FT-P-10 per-keyframe evidence CSV.
|
||
|
||
Header: ``keyframe_id, raw_lat, raw_lon, smoothed_lat, smoothed_lon,
|
||
gt_lat, gt_lon, raw_error_m, smoothed_error_m, improvement_m,
|
||
smoothed_wins``.
|
||
"""
|
||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||
with out_path.open("w", newline="") as fh:
|
||
writer = csv.writer(fh)
|
||
writer.writerow(
|
||
[
|
||
"keyframe_id",
|
||
"raw_lat",
|
||
"raw_lon",
|
||
"smoothed_lat",
|
||
"smoothed_lon",
|
||
"gt_lat",
|
||
"gt_lon",
|
||
"raw_error_m",
|
||
"smoothed_error_m",
|
||
"improvement_m",
|
||
"smoothed_wins",
|
||
]
|
||
)
|
||
for p in report.pairs:
|
||
writer.writerow(
|
||
[
|
||
p.keyframe_id,
|
||
f"{p.raw.lat_deg:.6f}",
|
||
f"{p.raw.lon_deg:.6f}",
|
||
f"{p.smoothed.lat_deg:.6f}",
|
||
f"{p.smoothed.lon_deg:.6f}",
|
||
f"{p.gt.lat_deg:.6f}",
|
||
f"{p.gt.lon_deg:.6f}",
|
||
f"{p.raw_error_m:.3f}",
|
||
f"{p.smoothed_error_m:.3f}",
|
||
f"{p.improvement_m:.3f}",
|
||
"true" if p.smoothed_wins else "false",
|
||
]
|
||
)
|
||
return out_path
|