Files
gps-denied-onboard/e2e/runner/helpers/smoothing_evaluator.py
T
Oleksandr Bezdieniezhnykh c6e6cba237 [AZ-414] [AZ-415] [AZ-418] Test batch 71: sharp turn + multi-segment + smoothing
- AZ-414 (FT-P-07 + FT-N-02): sharp_turn_detector helper covering
  AC-1 (gyro_z run detection + synthetic-overlay fallback),
  AC-2/AC-3 (FT-N-02 during-turn label + monotonic covariance),
  AC-4/AC-5/AC-6 (FT-P-07 recovery lag/drift/heading); twin scenario
  files under positive/ and negative/.
- AZ-415 (FT-P-08): multi_segment_evaluator helper + scenario.
- AZ-418 (FT-P-10): smoothing_evaluator helper covering AC-1 (raw +
  smoothed pose pairing), AC-2 (improvement rate >= 0.80), AC-3
  (mean improvement >= 5 m); scenario file.
- All scenarios skip-gated on upstream frame_source_replay /
  imu_replay / fdr_reader stubs (auto-activate when AZ-441 + AZ-407
  leftovers land).
- +68 unit tests; full e2e unit suite: 393 passed.

See _docs/03_implementation/batch_71_report.md and
_docs/03_implementation/reviews/batch_71_review.md.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 07:12:24 +03:00

247 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""GTSAM smoothing-loop look-back evaluation for FT-P-10 (AZ-418 / AC-4.5 revised).
The SUT exposes (per AC-NEW-3 FDR schema) two record entries per past
keyframe ``k``:
* ``raw_pose_k``: the single-shot pose at the keyframe's first emission.
* ``smoothed_pose_k``: the iSAM2-converged pose at smoother window exit.
This helper pairs them by keyframe id, computes per-keyframe geodesic
distance vs the Derkachi ``GLOBAL_POSITION_INT`` GT, and reports:
* AC-2: ``count(smoothed_error < raw_error) / total_keyframes ≥ 0.80``.
* AC-3: ``mean(raw_error smoothed_error) ≥ 5 m`` across all keyframes.
Mode B Fact #107 is reflected in the spec: this is an INTERNAL
improvement metric — NOT FC-side retroactive correction.
Public-boundary discipline: this module does NOT import any
``src/gps_denied_onboard`` symbol.
"""
from __future__ import annotations
import csv
from dataclasses import dataclass
from pathlib import Path
from statistics import mean
from typing import Iterable, Mapping, Sequence
from .geo import distance_m
IMPROVEMENT_RATE_REQUIRED = 0.80
MEAN_IMPROVEMENT_M_REQUIRED = 5.0
@dataclass(frozen=True)
class GtPose:
"""One Derkachi GLOBAL_POSITION_INT GT pose."""
monotonic_ms: int
lat_deg: float
lon_deg: float
@dataclass(frozen=True)
class KeyframePoseRecord:
"""One FDR pose record for a past keyframe.
The scenario builds these from the FDR archive, separating raw vs
smoothed by the FDR ``payload['pose_kind']`` field.
"""
keyframe_id: int
pose_kind: str # "raw" | "smoothed"
monotonic_ms: int
lat_deg: float
lon_deg: float
@dataclass(frozen=True)
class KeyframePair:
"""A matched raw + smoothed pair for one keyframe."""
keyframe_id: int
raw: KeyframePoseRecord
smoothed: KeyframePoseRecord
gt: GtPose
raw_error_m: float
smoothed_error_m: float
@property
def improvement_m(self) -> float:
return self.raw_error_m - self.smoothed_error_m
@property
def smoothed_wins(self) -> bool:
return self.smoothed_error_m < self.raw_error_m
@dataclass(frozen=True)
class SmoothingReport:
"""Aggregate report consumed by the scenario assertion."""
pairs: tuple[KeyframePair, ...]
improvement_rate: float
mean_improvement_m: float
median_improvement_m: float
rate_required: float = IMPROVEMENT_RATE_REQUIRED
mean_required: float = MEAN_IMPROVEMENT_M_REQUIRED
@property
def pair_count(self) -> int:
return len(self.pairs)
@property
def passes_rate(self) -> bool:
return self.pair_count > 0 and self.improvement_rate >= self.rate_required
@property
def passes_mean(self) -> bool:
return self.pair_count > 0 and self.mean_improvement_m >= self.mean_required
@property
def passes(self) -> bool:
return self.passes_rate and self.passes_mean
def pair_records(
records: Sequence[KeyframePoseRecord],
) -> dict[int, tuple[KeyframePoseRecord | None, KeyframePoseRecord | None]]:
"""Group records by ``keyframe_id``; return (raw, smoothed) pairs.
Duplicate ``pose_kind`` values for the same keyframe raise
``ValueError`` — the FDR schema MUST emit exactly one raw + one
smoothed per past keyframe.
"""
by_kf: dict[int, dict[str, KeyframePoseRecord]] = {}
for r in records:
if r.pose_kind not in ("raw", "smoothed"):
raise ValueError(
f"unknown pose_kind '{r.pose_kind}' for keyframe {r.keyframe_id}"
)
bucket = by_kf.setdefault(r.keyframe_id, {})
if r.pose_kind in bucket:
raise ValueError(
f"duplicate {r.pose_kind} pose for keyframe {r.keyframe_id}"
)
bucket[r.pose_kind] = r
return {
kf: (b.get("raw"), b.get("smoothed"))
for kf, b in by_kf.items()
}
def resolve_gt_at(monotonic_ms: int, gt_track: Sequence[GtPose]) -> GtPose:
"""Find the GT pose nearest to ``monotonic_ms`` (linear scan).
The Derkachi GT track is at 10 Hz (100 ms cadence); choosing the
nearest sample is < 50 ms off and acceptable for the AC-4.5
measurement which is about decimetre-to-metre improvement deltas.
"""
if not gt_track:
raise ValueError("gt_track is empty")
best = min(gt_track, key=lambda g: abs(g.monotonic_ms - monotonic_ms))
return best
def evaluate(
records: Sequence[KeyframePoseRecord],
gt_track: Sequence[GtPose],
) -> SmoothingReport:
"""Pair raw + smoothed by keyframe, compute errors, aggregate.
Keyframes missing either raw or smoothed are excluded from the
aggregate (with a docstring note); a future tightening could raise
on missing entries, but the spec text "if only one is present, the
test fails" applies at the scenario level — we surface it here via
the empty-pair count.
"""
if not gt_track:
raise ValueError("gt_track must not be empty")
paired = pair_records(records)
pairs: list[KeyframePair] = []
for kf, (raw, smoothed) in sorted(paired.items()):
if raw is None or smoothed is None:
continue
# Use the raw record's monotonic_ms to resolve GT — the keyframe's
# actual flight time, not the smoothing-window-exit time.
gt = resolve_gt_at(raw.monotonic_ms, gt_track)
raw_err = distance_m(gt.lat_deg, gt.lon_deg, raw.lat_deg, raw.lon_deg)
smoothed_err = distance_m(gt.lat_deg, gt.lon_deg, smoothed.lat_deg, smoothed.lon_deg)
pairs.append(
KeyframePair(
keyframe_id=kf,
raw=raw,
smoothed=smoothed,
gt=gt,
raw_error_m=raw_err,
smoothed_error_m=smoothed_err,
)
)
if pairs:
wins = sum(1 for p in pairs if p.smoothed_wins)
rate = wins / len(pairs)
improvements = [p.improvement_m for p in pairs]
mean_imp = mean(improvements)
sorted_imps = sorted(improvements)
n = len(sorted_imps)
median_imp = (
sorted_imps[n // 2] if n % 2 == 1
else (sorted_imps[n // 2 - 1] + sorted_imps[n // 2]) / 2.0
)
else:
rate = 0.0
mean_imp = 0.0
median_imp = 0.0
return SmoothingReport(
pairs=tuple(pairs),
improvement_rate=rate,
mean_improvement_m=mean_imp,
median_improvement_m=median_imp,
)
def write_csv_evidence(out_path: Path, report: SmoothingReport) -> Path:
"""Write FT-P-10 per-keyframe evidence CSV.
Header: ``keyframe_id, raw_lat, raw_lon, smoothed_lat, smoothed_lon,
gt_lat, gt_lon, raw_error_m, smoothed_error_m, improvement_m,
smoothed_wins``.
"""
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"keyframe_id",
"raw_lat",
"raw_lon",
"smoothed_lat",
"smoothed_lon",
"gt_lat",
"gt_lon",
"raw_error_m",
"smoothed_error_m",
"improvement_m",
"smoothed_wins",
]
)
for p in report.pairs:
writer.writerow(
[
p.keyframe_id,
f"{p.raw.lat_deg:.6f}",
f"{p.raw.lon_deg:.6f}",
f"{p.smoothed.lat_deg:.6f}",
f"{p.smoothed.lon_deg:.6f}",
f"{p.gt.lat_deg:.6f}",
f"{p.gt.lon_deg:.6f}",
f"{p.raw_error_m:.3f}",
f"{p.smoothed_error_m:.3f}",
f"{p.improvement_m:.3f}",
"true" if p.smoothed_wins else "false",
]
)
return out_path