[AZ-409] [AZ-412] [AZ-413] Batch 70: FT-P-01/04/05/06 scenarios

AZ-409 (3pt) — FT-P-01 still-image frame-center accuracy:
- accuracy_evaluator.py: GT loader + Vincenty error + AC-2/AC-3 pass-counts
- test_ft_p_01_still_image_accuracy.py: scenario gated on frame_source_replay
  + sitl_observer NotImplementedError; AC-4 timeout discipline

AZ-412 (3pt) — FT-P-04 Derkachi f2f registration >=95% on normal segments:
- registration_classifier.py: accel-derived attitude + overlap heuristic
  + success ratio with AC-3 sharp-turn exclusion
- test_ft_p_04_derkachi_f2f_registration.py: scenario gated on
  frame_source_replay + imu_replay + fdr_reader

AZ-413 (3pt) — FT-P-05 + FT-P-06 cross-domain MRE budgets:
- mre_evaluator.py: per-image budget (strict <2.5px) + 95th-percentile
  via numpy linear interp + combined report
- test_ft_p_05_sat_anchor.py: cross-domain scenario, reuses
  accuracy_evaluator for geodesic join
- test_ft_p_06_mre_budgets.py: pure piggyback on FT-P-04 + FT-P-05 CSV
  evidence; skips when either upstream CSV missing

Tests: 325 unit tests pass (+77 vs batch 69).
Reports: batch_70_report.md, batch_70_review.md (PASS).
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-16 18:10:46 +03:00
parent 702a0c0ff3
commit 29ac16cfcb
17 changed files with 3013 additions and 1 deletions
+256
View File
@@ -0,0 +1,256 @@
"""Per-image accuracy evaluation for FT-P-01 (AZ-409 — AC-1.1, AC-1.2).
Consumes a list of ``(image_id, est_lat, est_lon)`` estimates produced by
the SUT during a 60-image still-image push, joins against the ground-truth
``coordinates.csv`` shipped with the project, computes Vincenty geodesic
distance per image, and reports the AC-2 / AC-3 pass-counts.
The helper is **transport-agnostic**: the scenario test reads the per-image
estimates from the SITL observer (or post-run FDR archive) and hands a
typed list to ``evaluate()`` — no SUT import.
The pass-count thresholds come from the spec's
``expected_results/results_report.md`` Pass/Fail Rules:
* AC-2 (50 m budget): ≥48 / 60 images pass (80 %).
* AC-3 (20 m budget): ≥30 / 60 images pass (50 %).
Timeout discipline (AC-4): when the SITL listener times out for an image,
the scenario passes ``est_lat = est_lon = float('inf')``; ``evaluate()``
records ``error_m = inf``, ``pass_50m = False``, ``pass_20m = False`` for
that image. The aggregate may still pass if other images carry the count.
Public-boundary discipline: this module does NOT import any
``src/gps_denied_onboard`` symbol.
"""
from __future__ import annotations
import csv
import math
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Sequence
from .geo import distance_m
PASS_COUNT_50M_REQUIRED = 48
PASS_COUNT_20M_REQUIRED = 30
TOTAL_IMAGES_REQUIRED = 60
@dataclass(frozen=True)
class GtCoordinate:
"""Ground-truth WGS84 frame-center coordinate for one still image."""
image_id: str
lat_deg: float
lon_deg: float
@dataclass(frozen=True)
class EstimateInput:
"""One outbound estimate observed at the SITL listener.
For a timed-out image (no message received within the scenario's 5 s
budget) the scenario passes ``est_lat = est_lon = float('inf')``;
``evaluate()`` records ``error_m = inf`` and both pass flags False.
"""
image_id: str
est_lat_deg: float
est_lon_deg: float
@dataclass(frozen=True)
class PerImageResult:
"""Per-image evaluation row written to ``ft-p-01.csv``."""
image_id: str
gt_lat: float
gt_lon: float
est_lat: float
est_lon: float
error_m: float
pass_50m: bool
pass_20m: bool
@dataclass(frozen=True)
class AggregateReport:
"""Aggregate pass-count over a 60-image run; drives the scenario assertion."""
total_images: int
pass_count_50m: int
pass_count_20m: int
timeout_count: int
pass_50m_required: int = PASS_COUNT_50M_REQUIRED
pass_20m_required: int = PASS_COUNT_20M_REQUIRED
@property
def pass_ac2(self) -> bool:
"""AC-2: ≥48 / 60 pass the 50 m budget."""
return self.pass_count_50m >= self.pass_50m_required
@property
def pass_ac3(self) -> bool:
"""AC-3: ≥30 / 60 pass the 20 m budget."""
return self.pass_count_20m >= self.pass_20m_required
@property
def overall_pass(self) -> bool:
"""Scenario passes iff both AC-2 and AC-3 hold."""
return self.pass_ac2 and self.pass_ac3
def load_gt_coordinates(csv_path: Path) -> list[GtCoordinate]:
"""Parse the project's ``coordinates.csv``.
Header format: ``image, lat, lon`` (with the project's whitespace
around commas — tolerated).
"""
if not csv_path.exists():
raise FileNotFoundError(
f"coordinates.csv not found at {csv_path} — check the bind-mount or repo path"
)
rows: list[GtCoordinate] = []
with csv_path.open() as fh:
reader = csv.reader(fh)
header = next(reader)
normalised_header = [c.strip() for c in header]
expected = ["image", "lat", "lon"]
if normalised_header != expected:
raise ValueError(
f"coordinates.csv header mismatch: expected {expected}, got {normalised_header}"
)
for raw in reader:
if not raw:
continue
image_id, lat_str, lon_str = (c.strip() for c in raw)
rows.append(
GtCoordinate(
image_id=image_id,
lat_deg=float(lat_str),
lon_deg=float(lon_str),
)
)
return rows
def _is_timeout(value: float) -> bool:
"""An est_lat or est_lon of inf marks an AC-4 timeout."""
return math.isinf(value)
def compute_per_image(
gt: GtCoordinate, estimate: EstimateInput
) -> PerImageResult:
"""Compute error_m + AC-2/AC-3 pass flags for one image."""
if gt.image_id != estimate.image_id:
raise ValueError(
f"image_id mismatch: gt='{gt.image_id}' estimate='{estimate.image_id}'"
)
if _is_timeout(estimate.est_lat_deg) or _is_timeout(estimate.est_lon_deg):
return PerImageResult(
image_id=gt.image_id,
gt_lat=gt.lat_deg,
gt_lon=gt.lon_deg,
est_lat=estimate.est_lat_deg,
est_lon=estimate.est_lon_deg,
error_m=math.inf,
pass_50m=False,
pass_20m=False,
)
err = distance_m(gt.lat_deg, gt.lon_deg, estimate.est_lat_deg, estimate.est_lon_deg)
return PerImageResult(
image_id=gt.image_id,
gt_lat=gt.lat_deg,
gt_lon=gt.lon_deg,
est_lat=estimate.est_lat_deg,
est_lon=estimate.est_lon_deg,
error_m=err,
pass_50m=err <= 50.0,
pass_20m=err <= 20.0,
)
def evaluate(
gt_rows: Sequence[GtCoordinate],
estimates: Sequence[EstimateInput],
) -> tuple[list[PerImageResult], AggregateReport]:
"""Join GT + estimates by image_id, compute per-image + aggregate.
The GT order is authoritative — the resulting list is in GT order so
the CSV column is stable across runs. An estimate without a matching
GT row is an error (the scenario should not push a stranger image);
a GT row without a matching estimate is a timeout (recorded with inf).
"""
by_id = {e.image_id: e for e in estimates}
if len(by_id) != len(estimates):
seen: set[str] = set()
dupes: list[str] = []
for e in estimates:
if e.image_id in seen:
dupes.append(e.image_id)
seen.add(e.image_id)
raise ValueError(f"duplicate estimate image_ids: {sorted(set(dupes))}")
stranger_ids = sorted(set(by_id) - {g.image_id for g in gt_rows})
if stranger_ids:
raise ValueError(
f"estimate(s) for image_id(s) not in GT: {stranger_ids}"
)
results: list[PerImageResult] = []
timeout_count = 0
for gt in gt_rows:
est = by_id.get(gt.image_id)
if est is None:
est = EstimateInput(image_id=gt.image_id, est_lat_deg=math.inf, est_lon_deg=math.inf)
timeout_count += 1
elif _is_timeout(est.est_lat_deg) or _is_timeout(est.est_lon_deg):
timeout_count += 1
results.append(compute_per_image(gt, est))
aggregate = AggregateReport(
total_images=len(results),
pass_count_50m=sum(1 for r in results if r.pass_50m),
pass_count_20m=sum(1 for r in results if r.pass_20m),
timeout_count=timeout_count,
)
return results, aggregate
def write_csv_evidence(out_path: Path, results: Iterable[PerImageResult]) -> Path:
"""Write the FT-P-01 per-image evidence CSV.
Header: ``image_id, gt_lat, gt_lon, est_lat, est_lon, error_m, pass_50m, pass_20m``.
"""
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"image_id",
"gt_lat",
"gt_lon",
"est_lat",
"est_lon",
"error_m",
"pass_50m",
"pass_20m",
]
)
for r in results:
writer.writerow(
[
r.image_id,
f"{r.gt_lat:.6f}",
f"{r.gt_lon:.6f}",
"inf" if math.isinf(r.est_lat) else f"{r.est_lat:.6f}",
"inf" if math.isinf(r.est_lon) else f"{r.est_lon:.6f}",
"inf" if math.isinf(r.error_m) else f"{r.error_m:.3f}",
"true" if r.pass_50m else "false",
"true" if r.pass_20m else "false",
]
)
return out_path
+284
View File
@@ -0,0 +1,284 @@
"""MRE budget evaluation for FT-P-05 / FT-P-06 (AZ-413 / AC-2.1b, AC-2.2).
The SUT exposes per-frame **MRE** (Mean Reprojection Error, in pixels)
for both:
* **Frame-to-frame** registrations — produced during the Derkachi replay
(FT-P-04 scope; the MRE per frame is recorded in the FDR archive
alongside the boolean success metric).
* **Cross-domain** registrations — produced when the satellite-anchor
pipeline matches a UAV frame against a satellite tile (FT-P-05 scope;
one MRE per still-image push).
FT-P-05 binds:
* AC-2 (per-image cross-domain): every image's MRE < 2.5 px.
* AC-3 (accuracy alongside MRE): inherits FT-P-01 thresholds (≥80 % at
50 m, ≥50 % at 20 m) but on the same image set; the helper reuses
``accuracy_evaluator`` for the geodesic part.
FT-P-06 binds AC-4: the 95th percentile MRE bound — < 1.0 px frame-to-frame
AND < 2.5 px cross-domain. The 95th percentile is computed with numpy's
default linear-interpolation algorithm (which the spec explicitly names).
Public-boundary discipline: this module does NOT import any
``src/gps_denied_onboard`` symbol.
"""
from __future__ import annotations
import csv
from dataclasses import dataclass
from pathlib import Path
from statistics import median
from typing import Iterable, Sequence
import numpy as np
MRE_PER_IMAGE_BUDGET_PX = 2.5
MRE_P95_FRAME_TO_FRAME_BUDGET_PX = 1.0
MRE_P95_CROSS_DOMAIN_BUDGET_PX = 2.5
@dataclass(frozen=True)
class CrossDomainRecord:
"""One observation per still-image push (FT-P-05)."""
image_id: str
mre_px: float
error_m: float
@dataclass(frozen=True)
class FrameToFrameRecord:
"""One observation per video frame (FT-P-04 evidence reused by FT-P-06)."""
frame_index: int
mre_px: float
@dataclass(frozen=True)
class PerImageBudgetReport:
"""FT-P-05 AC-2: every image MRE < 2.5 px."""
total_images: int
pass_count: int
fail_image_ids: tuple[str, ...]
max_mre_px: float
budget_px: float = MRE_PER_IMAGE_BUDGET_PX
@property
def passes(self) -> bool:
return self.pass_count == self.total_images > 0
@dataclass(frozen=True)
class P95Report:
"""FT-P-06 AC-4: 95th-percentile budget."""
sample_count: int
p95_px: float
budget_px: float
@property
def passes(self) -> bool:
return self.sample_count > 0 and self.p95_px < self.budget_px
@dataclass(frozen=True)
class CombinedP95Report:
"""FT-P-06 combined assertion across both domains."""
frame_to_frame: P95Report
cross_domain: P95Report
@property
def passes(self) -> bool:
return self.frame_to_frame.passes and self.cross_domain.passes
def evaluate_per_image_budget(
records: Sequence[CrossDomainRecord],
*,
budget_px: float = MRE_PER_IMAGE_BUDGET_PX,
) -> PerImageBudgetReport:
"""AC-2 of FT-P-05: every cross-domain MRE strictly below ``budget_px``.
Strictness: the spec text "MRE < 2.5 px for all images" reads as a
strict less-than. A record at exactly 2.5 px FAILS (the matcher must
be inside the budget, not on the boundary).
"""
if budget_px <= 0:
raise ValueError(f"budget_px must be > 0, got {budget_px}")
fail_ids: list[str] = []
pass_count = 0
max_mre = 0.0
for r in records:
max_mre = max(max_mre, r.mre_px)
if r.mre_px < budget_px:
pass_count += 1
else:
fail_ids.append(r.image_id)
return PerImageBudgetReport(
total_images=len(records),
pass_count=pass_count,
fail_image_ids=tuple(fail_ids),
max_mre_px=max_mre,
budget_px=budget_px,
)
def evaluate_p95(
mre_samples: Sequence[float],
*,
budget_px: float,
) -> P95Report:
"""AC-4 of FT-P-06: 95th-percentile MRE strictly below ``budget_px``.
Percentile computed via ``numpy.percentile`` with the default
``method='linear'`` (linear interpolation between adjacent ranks).
The spec explicitly names that method.
"""
if budget_px <= 0:
raise ValueError(f"budget_px must be > 0, got {budget_px}")
n = len(mre_samples)
if n == 0:
return P95Report(sample_count=0, p95_px=float("nan"), budget_px=budget_px)
p95 = float(np.percentile(np.asarray(mre_samples, dtype=float), 95))
return P95Report(sample_count=n, p95_px=p95, budget_px=budget_px)
def evaluate_combined_p95(
frame_to_frame: Sequence[FrameToFrameRecord],
cross_domain: Sequence[CrossDomainRecord],
) -> CombinedP95Report:
"""FT-P-06 combined assertion using per-domain budgets."""
f2f = evaluate_p95(
[r.mre_px for r in frame_to_frame],
budget_px=MRE_P95_FRAME_TO_FRAME_BUDGET_PX,
)
xd = evaluate_p95(
[r.mre_px for r in cross_domain],
budget_px=MRE_P95_CROSS_DOMAIN_BUDGET_PX,
)
return CombinedP95Report(frame_to_frame=f2f, cross_domain=xd)
def load_cross_domain_csv(csv_path: Path) -> list[CrossDomainRecord]:
"""Read ``ft-p-05.csv`` back into typed records (used by FT-P-06)."""
if not csv_path.exists():
raise FileNotFoundError(
f"FT-P-05 evidence not found at {csv_path} — run FT-P-05 first."
)
records: list[CrossDomainRecord] = []
with csv_path.open() as fh:
reader = csv.DictReader(fh)
needed = {"image_id", "mre_px", "error_m"}
missing = needed - set(reader.fieldnames or [])
if missing:
raise ValueError(f"FT-P-05 CSV missing columns: {sorted(missing)}")
for row in reader:
records.append(
CrossDomainRecord(
image_id=row["image_id"],
mre_px=float(row["mre_px"]),
error_m=float(row["error_m"]) if row["error_m"] != "inf" else float("inf"),
)
)
return records
def load_frame_to_frame_csv(csv_path: Path) -> list[FrameToFrameRecord]:
"""Read frame-to-frame MRE from the FT-P-04 evidence CSV.
The FT-P-04 CSV currently includes ``registration_success`` per frame
but NOT MRE; that column will be added when the SUT exposes it
(AC-NEW-3 FDR schema). This loader expects a ``mre_px`` column —
raises ValueError if absent so the FT-P-06 scenario fails loudly.
"""
if not csv_path.exists():
raise FileNotFoundError(
f"FT-P-04 evidence not found at {csv_path} — run FT-P-04 first."
)
records: list[FrameToFrameRecord] = []
with csv_path.open() as fh:
reader = csv.DictReader(fh)
if "mre_px" not in (reader.fieldnames or []):
raise ValueError(
"FT-P-04 evidence is missing the 'mre_px' column required by FT-P-06. "
"The SUT must emit per-frame MRE in the FDR archive (AC-NEW-3)."
)
for row in reader:
mre_str = row["mre_px"].strip()
if not mre_str:
continue
records.append(
FrameToFrameRecord(
frame_index=int(row["frame_index"]),
mre_px=float(mre_str),
)
)
return records
def write_cross_domain_csv(
out_path: Path,
records: Iterable[CrossDomainRecord],
*,
pass_50m: dict[str, bool] | None = None,
pass_20m: dict[str, bool] | None = None,
) -> Path:
"""Write the FT-P-05 per-image evidence CSV.
Header: ``image_id, est_lat, est_lon, error_m, mre_px, pass_50m,
pass_20m, pass_mre``. The lat/lon columns are emitted as blanks here
(the scenario file fills them via ``write_csv_evidence`` from
``accuracy_evaluator`` — this writer is for the FT-P-06-relevant
columns only).
"""
pass_50m = pass_50m or {}
pass_20m = pass_20m or {}
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"image_id",
"est_lat",
"est_lon",
"error_m",
"mre_px",
"pass_50m",
"pass_20m",
"pass_mre",
]
)
for r in records:
writer.writerow(
[
r.image_id,
"",
"",
"inf" if r.error_m == float("inf") else f"{r.error_m:.3f}",
f"{r.mre_px:.4f}",
"true" if pass_50m.get(r.image_id, False) else "false",
"true" if pass_20m.get(r.image_id, False) else "false",
"true" if r.mre_px < MRE_PER_IMAGE_BUDGET_PX else "false",
]
)
return out_path
def summarize_mre_distribution(records: Sequence[FrameToFrameRecord | CrossDomainRecord]) -> dict[str, float]:
"""Summary stats for diagnostic logging (median, p95, max).
Convenience helper; not used by the AC assertions themselves.
"""
if not records:
return {"count": 0.0, "median": float("nan"), "p95": float("nan"), "max": float("nan")}
samples = [r.mre_px for r in records]
return {
"count": float(len(samples)),
"median": float(median(samples)),
"p95": float(np.percentile(np.asarray(samples, dtype=float), 95)),
"max": float(max(samples)),
}
@@ -0,0 +1,382 @@
"""Normal-segment classification + success-ratio for FT-P-04 (AZ-412 / AC-2.1a).
The SUT exposes a per-frame ``registration_success`` boolean (either via
``NAMED_VALUE_FLOAT`` MAVLink messages or via the post-run FDR archive).
This helper:
1. Reads the Derkachi ``data_imu.csv`` (SCALED_IMU2 + GLOBAL_POSITION_INT
columns) and derives a per-row attitude approximation from accelerometer
readings (the spec's AC-1 explicitly says attitude is
``SCALED_IMU2``-derived, NOT internal SUT state).
2. Classifies each video frame as **normal** when both:
* bank/pitch within ±10° of nadir, AND
* inferred prior-frame overlap ≥40 % (heuristic from translation magnitude).
3. Computes the success ratio over the **normal** set only — sharp-turn
frames are excluded from the denominator per AC-3.
4. Asserts the ratio meets the AC-2 budget (≥0.95).
The video is 30 fps; the IMU/telemetry CSV is 10 Hz (one row per 100 ms,
i.e. 3 video frames per row). The classifier expands each telemetry row
to 3 video-frame indices (the same row drives 3 consecutive frames).
Public-boundary discipline: this module does NOT import any
``src/gps_denied_onboard`` symbol.
"""
from __future__ import annotations
import csv
import math
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable, Mapping, Sequence
ATTITUDE_LIMIT_DEG = 10.0
TARGET_OVERLAP_FRACTION = 0.40
SUCCESS_RATIO_REQUIRED = 0.95
VIDEO_FPS = 30
IMU_HZ = 10
VIDEO_FRAMES_PER_IMU_ROW = VIDEO_FPS // IMU_HZ
# Derkachi nadir camera: the camera_info.md fixture records ~141 m altitude
# AGL and ~55° horizontal FOV. The "ground footprint width" at nadir is
# 2 * alt * tan(FOV/2) ≈ 2 * 141 * tan(27.5°) ≈ 147 m. We use a single
# scenario-wide ground footprint to keep the heuristic transparent.
DEFAULT_GROUND_FOOTPRINT_M = 147.0
@dataclass(frozen=True)
class ImuTelemetryRow:
"""One row of ``data_imu.csv`` distilled to the columns the classifier needs.
Velocity fields are floats (cm/s) because the shipped ``data_imu.csv``
stores them in scientific notation (e.g. ``-4.44E-16`` near hover).
Acceleration fields stay int per the SCALED_IMU2 wire format.
"""
timestamp_ms: float
time_s: float
xacc: int
yacc: int
zacc: int
vx_cms: float
vy_cms: float
vz_cms: float
@dataclass(frozen=True)
class FrameAttitude:
"""Bank + pitch derived from accel; used for the ±10° gate.
Accel-only attitude assumes the platform is in near-equilibrium
flight (the dominant accel is gravity). Valid for the cropped
nadir-cruise segments AC-2.1a targets; explicitly NOT valid during
aggressive manoeuvres — but those are exactly the frames AC-2.1a
wants to EXCLUDE from the denominator. So the limitation matches
the AC intent.
"""
bank_deg: float
pitch_deg: float
@dataclass(frozen=True)
class FrameClassification:
"""Per-video-frame classification used to build the FT-P-04 denominator."""
frame_index: int
imu_row_index: int
attitude: FrameAttitude
translation_m: float
overlap_fraction: float
is_normal: bool
excluded_reason: str = ""
@dataclass(frozen=True)
class SuccessReport:
"""Aggregate report consumed by the scenario assertion.
``ratio = success_count / denominator`` where ``denominator`` is the
count of normal frames (sharp-turn / low-overlap frames are excluded
per AC-3 — they are counted in ``excluded_count`` for diagnostic
clarity).
"""
success_count: int
denominator: int
ratio: float
excluded_count: int
excluded_by_attitude: int
excluded_by_overlap: int
excluded_by_missing_metric: int
ratio_required: float = SUCCESS_RATIO_REQUIRED
@property
def passes(self) -> bool:
return self.denominator > 0 and self.ratio >= self.ratio_required
def load_imu_telemetry(csv_path: Path) -> list[ImuTelemetryRow]:
"""Read ``data_imu.csv`` and return one row per non-blank entry.
Only the columns the classifier needs are kept. Other columns are
ignored to keep the classifier independent of upstream column churn.
"""
if not csv_path.exists():
raise FileNotFoundError(
f"data_imu.csv not found at {csv_path} — bind-mount the Derkachi fixture"
)
needed = {
"timestamp(ms)",
"Time",
"SCALED_IMU2.xacc",
"SCALED_IMU2.yacc",
"SCALED_IMU2.zacc",
"GLOBAL_POSITION_INT.vx",
"GLOBAL_POSITION_INT.vy",
"GLOBAL_POSITION_INT.vz",
}
rows: list[ImuTelemetryRow] = []
with csv_path.open() as fh:
reader = csv.DictReader(fh)
missing = needed - set(reader.fieldnames or [])
if missing:
raise ValueError(f"data_imu.csv missing columns: {sorted(missing)}")
for raw in reader:
if not raw["timestamp(ms)"].strip():
continue
rows.append(
ImuTelemetryRow(
timestamp_ms=float(raw["timestamp(ms)"]),
time_s=float(raw["Time"]),
xacc=int(float(raw["SCALED_IMU2.xacc"])),
yacc=int(float(raw["SCALED_IMU2.yacc"])),
zacc=int(float(raw["SCALED_IMU2.zacc"])),
vx_cms=float(raw["GLOBAL_POSITION_INT.vx"]),
vy_cms=float(raw["GLOBAL_POSITION_INT.vy"]),
vz_cms=float(raw["GLOBAL_POSITION_INT.vz"]),
)
)
return rows
def compute_attitude(row: ImuTelemetryRow) -> FrameAttitude:
"""Derive bank + pitch from accelerometer (gravity-as-down assumption).
SCALED_IMU2 acc components are in mg (milli-g). Sign convention:
body-frame +x forward, +y right, +z down. With dominant gravity on
+z the resting attitude has xacc=0, yacc=0, zacc=-1000 (negative
because the body frame measures the reaction force pointing UP).
pitch = atan2(-xacc, sqrt(yacc² + zacc²))
bank = atan2(yacc, zacc)
"""
x = float(row.xacc)
y = float(row.yacc)
z = float(row.zacc)
pitch_rad = math.atan2(-x, math.sqrt(y * y + z * z))
bank_rad = math.atan2(y, z)
# The atan2(y, z) convention puts level flight at ±π (since z is
# negative gravity); we want level = 0, so subtract π and wrap.
bank_deg_raw = math.degrees(bank_rad)
if bank_deg_raw > 90.0:
bank_deg = bank_deg_raw - 180.0
elif bank_deg_raw < -90.0:
bank_deg = bank_deg_raw + 180.0
else:
bank_deg = bank_deg_raw
return FrameAttitude(bank_deg=bank_deg, pitch_deg=math.degrees(pitch_rad))
def compute_translation_m(row: ImuTelemetryRow, prev_row: ImuTelemetryRow | None) -> float:
"""Ground-plane translation between consecutive frames in meters.
Uses the GLOBAL_POSITION_INT velocity (vx, vy in cm/s); vz is
excluded because vertical motion mostly affects scale, not overlap.
Per-frame dt = 1/30 s. With telemetry at 10 Hz, the same velocity
drives 3 consecutive frames.
"""
vx_ms = row.vx_cms / 100.0
vy_ms = row.vy_cms / 100.0
horizontal_speed = math.hypot(vx_ms, vy_ms)
dt_s = 1.0 / VIDEO_FPS
return horizontal_speed * dt_s
def compute_overlap_fraction(
translation_m: float, ground_footprint_m: float
) -> float:
"""Fraction of ground footprint that overlaps with the prior frame.
Approximation: assume a square ground footprint of side
``ground_footprint_m``. After translating by ``translation_m`` in
the horizontal plane, the overlap is
``max(0, 1 - translation_m / ground_footprint_m)``.
This is an upper bound — diagonal motion or rotation eats more
overlap. The ±10° attitude gate rules out the rotation-heavy
frames; pure translation is what survives, and this approximation
is tight for cruise flight.
"""
if ground_footprint_m <= 0:
raise ValueError(f"ground_footprint_m must be > 0, got {ground_footprint_m}")
fraction = 1.0 - translation_m / ground_footprint_m
return max(0.0, min(1.0, fraction))
def classify_frames(
imu_rows: Sequence[ImuTelemetryRow],
*,
attitude_limit_deg: float = ATTITUDE_LIMIT_DEG,
min_overlap_fraction: float = TARGET_OVERLAP_FRACTION,
ground_footprint_m: float = DEFAULT_GROUND_FOOTPRINT_M,
video_frames_per_imu_row: int = VIDEO_FRAMES_PER_IMU_ROW,
) -> list[FrameClassification]:
"""Build the per-video-frame classification list.
Each ``ImuTelemetryRow`` drives ``video_frames_per_imu_row``
consecutive video frames (3 frames per IMU row by default). Frame
indices are 0-based and contiguous.
Determinism: this function depends only on the input rows + tunables
— same input → same output.
"""
if attitude_limit_deg <= 0:
raise ValueError(f"attitude_limit_deg must be > 0, got {attitude_limit_deg}")
if not 0.0 < min_overlap_fraction < 1.0:
raise ValueError(
f"min_overlap_fraction must be in (0, 1), got {min_overlap_fraction}"
)
classifications: list[FrameClassification] = []
prev_row: ImuTelemetryRow | None = None
frame_index = 0
for imu_row_index, row in enumerate(imu_rows):
attitude = compute_attitude(row)
translation_m = compute_translation_m(row, prev_row)
overlap_fraction = compute_overlap_fraction(translation_m, ground_footprint_m)
attitude_ok = (
abs(attitude.bank_deg) <= attitude_limit_deg
and abs(attitude.pitch_deg) <= attitude_limit_deg
)
overlap_ok = overlap_fraction >= min_overlap_fraction
is_normal = attitude_ok and overlap_ok
if not attitude_ok:
reason = "attitude_exceeds_limit"
elif not overlap_ok:
reason = "overlap_below_threshold"
else:
reason = ""
for _ in range(video_frames_per_imu_row):
classifications.append(
FrameClassification(
frame_index=frame_index,
imu_row_index=imu_row_index,
attitude=attitude,
translation_m=translation_m,
overlap_fraction=overlap_fraction,
is_normal=is_normal,
excluded_reason=reason,
)
)
frame_index += 1
prev_row = row
return classifications
def compute_success_ratio(
classifications: Sequence[FrameClassification],
registration_success_by_frame: Mapping[int, bool],
) -> SuccessReport:
"""Compute the success ratio over the normal-frame denominator.
Parameters
----------
classifications : the per-frame classification list (output of
``classify_frames``).
registration_success_by_frame : dict[frame_index, bool] — populated
from ``NAMED_VALUE_FLOAT`` listener output or post-run FDR read.
Frames missing from this dict are excluded from the denominator
and counted in ``excluded_by_missing_metric`` (the SUT failed to
emit the metric — AC-2 of the AC-NEW-3 FDR-schema spec covers
the schema; this scenario complains if the metric is absent).
Returns
-------
SuccessReport
"""
success = 0
denominator = 0
excluded_by_attitude = 0
excluded_by_overlap = 0
excluded_by_missing_metric = 0
for cls in classifications:
if not cls.is_normal:
if cls.excluded_reason == "attitude_exceeds_limit":
excluded_by_attitude += 1
elif cls.excluded_reason == "overlap_below_threshold":
excluded_by_overlap += 1
continue
metric = registration_success_by_frame.get(cls.frame_index)
if metric is None:
excluded_by_missing_metric += 1
continue
denominator += 1
if metric:
success += 1
excluded_count = excluded_by_attitude + excluded_by_overlap + excluded_by_missing_metric
ratio = (success / denominator) if denominator > 0 else 0.0
return SuccessReport(
success_count=success,
denominator=denominator,
ratio=ratio,
excluded_count=excluded_count,
excluded_by_attitude=excluded_by_attitude,
excluded_by_overlap=excluded_by_overlap,
excluded_by_missing_metric=excluded_by_missing_metric,
)
def write_csv_evidence(
out_path: Path,
classifications: Iterable[FrameClassification],
registration_success_by_frame: Mapping[int, bool],
) -> Path:
"""Write the FT-P-04 per-frame evidence CSV.
Header: ``frame_index, imu_row_index, bank_deg, pitch_deg,
translation_m, overlap_fraction, is_normal, excluded_reason,
registration_success``.
"""
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"frame_index",
"imu_row_index",
"bank_deg",
"pitch_deg",
"translation_m",
"overlap_fraction",
"is_normal",
"excluded_reason",
"registration_success",
]
)
for cls in classifications:
metric = registration_success_by_frame.get(cls.frame_index)
writer.writerow(
[
cls.frame_index,
cls.imu_row_index,
f"{cls.attitude.bank_deg:.3f}",
f"{cls.attitude.pitch_deg:.3f}",
f"{cls.translation_m:.4f}",
f"{cls.overlap_fraction:.4f}",
"true" if cls.is_normal else "false",
cls.excluded_reason,
"" if metric is None else ("true" if metric else "false"),
]
)
return out_path