Files
Oleksandr Bezdieniezhnykh 29ac16cfcb [AZ-409] [AZ-412] [AZ-413] Batch 70: FT-P-01/04/05/06 scenarios
AZ-409 (3pt) — FT-P-01 still-image frame-center accuracy:
- accuracy_evaluator.py: GT loader + Vincenty error + AC-2/AC-3 pass-counts
- test_ft_p_01_still_image_accuracy.py: scenario gated on frame_source_replay
  + sitl_observer NotImplementedError; AC-4 timeout discipline

AZ-412 (3pt) — FT-P-04 Derkachi f2f registration >=95% on normal segments:
- registration_classifier.py: accel-derived attitude + overlap heuristic
  + success ratio with AC-3 sharp-turn exclusion
- test_ft_p_04_derkachi_f2f_registration.py: scenario gated on
  frame_source_replay + imu_replay + fdr_reader

AZ-413 (3pt) — FT-P-05 + FT-P-06 cross-domain MRE budgets:
- mre_evaluator.py: per-image budget (strict <2.5px) + 95th-percentile
  via numpy linear interp + combined report
- test_ft_p_05_sat_anchor.py: cross-domain scenario, reuses
  accuracy_evaluator for geodesic join
- test_ft_p_06_mre_budgets.py: pure piggyback on FT-P-04 + FT-P-05 CSV
  evidence; skips when either upstream CSV missing

Tests: 325 unit tests pass (+77 vs batch 69).
Reports: batch_70_report.md, batch_70_review.md (PASS).
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 18:10:46 +03:00

383 lines
14 KiB
Python

"""Normal-segment classification + success-ratio for FT-P-04 (AZ-412 / AC-2.1a).
The SUT exposes a per-frame ``registration_success`` boolean (either via
``NAMED_VALUE_FLOAT`` MAVLink messages or via the post-run FDR archive).
This helper:
1. Reads the Derkachi ``data_imu.csv`` (SCALED_IMU2 + GLOBAL_POSITION_INT
columns) and derives a per-row attitude approximation from accelerometer
readings (the spec's AC-1 explicitly says attitude is
``SCALED_IMU2``-derived, NOT internal SUT state).
2. Classifies each video frame as **normal** when both:
* bank/pitch within ±10° of nadir, AND
* inferred prior-frame overlap ≥40 % (heuristic from translation magnitude).
3. Computes the success ratio over the **normal** set only — sharp-turn
frames are excluded from the denominator per AC-3.
4. Asserts the ratio meets the AC-2 budget (≥0.95).
The video is 30 fps; the IMU/telemetry CSV is 10 Hz (one row per 100 ms,
i.e. 3 video frames per row). The classifier expands each telemetry row
to 3 video-frame indices (the same row drives 3 consecutive frames).
Public-boundary discipline: this module does NOT import any
``src/gps_denied_onboard`` symbol.
"""
from __future__ import annotations
import csv
import math
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable, Mapping, Sequence
ATTITUDE_LIMIT_DEG = 10.0
TARGET_OVERLAP_FRACTION = 0.40
SUCCESS_RATIO_REQUIRED = 0.95
VIDEO_FPS = 30
IMU_HZ = 10
VIDEO_FRAMES_PER_IMU_ROW = VIDEO_FPS // IMU_HZ
# Derkachi nadir camera: the camera_info.md fixture records ~141 m altitude
# AGL and ~55° horizontal FOV. The "ground footprint width" at nadir is
# 2 * alt * tan(FOV/2) ≈ 2 * 141 * tan(27.5°) ≈ 147 m. We use a single
# scenario-wide ground footprint to keep the heuristic transparent.
DEFAULT_GROUND_FOOTPRINT_M = 147.0
@dataclass(frozen=True)
class ImuTelemetryRow:
"""One row of ``data_imu.csv`` distilled to the columns the classifier needs.
Velocity fields are floats (cm/s) because the shipped ``data_imu.csv``
stores them in scientific notation (e.g. ``-4.44E-16`` near hover).
Acceleration fields stay int per the SCALED_IMU2 wire format.
"""
timestamp_ms: float
time_s: float
xacc: int
yacc: int
zacc: int
vx_cms: float
vy_cms: float
vz_cms: float
@dataclass(frozen=True)
class FrameAttitude:
"""Bank + pitch derived from accel; used for the ±10° gate.
Accel-only attitude assumes the platform is in near-equilibrium
flight (the dominant accel is gravity). Valid for the cropped
nadir-cruise segments AC-2.1a targets; explicitly NOT valid during
aggressive manoeuvres — but those are exactly the frames AC-2.1a
wants to EXCLUDE from the denominator. So the limitation matches
the AC intent.
"""
bank_deg: float
pitch_deg: float
@dataclass(frozen=True)
class FrameClassification:
"""Per-video-frame classification used to build the FT-P-04 denominator."""
frame_index: int
imu_row_index: int
attitude: FrameAttitude
translation_m: float
overlap_fraction: float
is_normal: bool
excluded_reason: str = ""
@dataclass(frozen=True)
class SuccessReport:
"""Aggregate report consumed by the scenario assertion.
``ratio = success_count / denominator`` where ``denominator`` is the
count of normal frames (sharp-turn / low-overlap frames are excluded
per AC-3 — they are counted in ``excluded_count`` for diagnostic
clarity).
"""
success_count: int
denominator: int
ratio: float
excluded_count: int
excluded_by_attitude: int
excluded_by_overlap: int
excluded_by_missing_metric: int
ratio_required: float = SUCCESS_RATIO_REQUIRED
@property
def passes(self) -> bool:
return self.denominator > 0 and self.ratio >= self.ratio_required
def load_imu_telemetry(csv_path: Path) -> list[ImuTelemetryRow]:
"""Read ``data_imu.csv`` and return one row per non-blank entry.
Only the columns the classifier needs are kept. Other columns are
ignored to keep the classifier independent of upstream column churn.
"""
if not csv_path.exists():
raise FileNotFoundError(
f"data_imu.csv not found at {csv_path} — bind-mount the Derkachi fixture"
)
needed = {
"timestamp(ms)",
"Time",
"SCALED_IMU2.xacc",
"SCALED_IMU2.yacc",
"SCALED_IMU2.zacc",
"GLOBAL_POSITION_INT.vx",
"GLOBAL_POSITION_INT.vy",
"GLOBAL_POSITION_INT.vz",
}
rows: list[ImuTelemetryRow] = []
with csv_path.open() as fh:
reader = csv.DictReader(fh)
missing = needed - set(reader.fieldnames or [])
if missing:
raise ValueError(f"data_imu.csv missing columns: {sorted(missing)}")
for raw in reader:
if not raw["timestamp(ms)"].strip():
continue
rows.append(
ImuTelemetryRow(
timestamp_ms=float(raw["timestamp(ms)"]),
time_s=float(raw["Time"]),
xacc=int(float(raw["SCALED_IMU2.xacc"])),
yacc=int(float(raw["SCALED_IMU2.yacc"])),
zacc=int(float(raw["SCALED_IMU2.zacc"])),
vx_cms=float(raw["GLOBAL_POSITION_INT.vx"]),
vy_cms=float(raw["GLOBAL_POSITION_INT.vy"]),
vz_cms=float(raw["GLOBAL_POSITION_INT.vz"]),
)
)
return rows
def compute_attitude(row: ImuTelemetryRow) -> FrameAttitude:
"""Derive bank + pitch from accelerometer (gravity-as-down assumption).
SCALED_IMU2 acc components are in mg (milli-g). Sign convention:
body-frame +x forward, +y right, +z down. With dominant gravity on
+z the resting attitude has xacc=0, yacc=0, zacc=-1000 (negative
because the body frame measures the reaction force pointing UP).
pitch = atan2(-xacc, sqrt(yacc² + zacc²))
bank = atan2(yacc, zacc)
"""
x = float(row.xacc)
y = float(row.yacc)
z = float(row.zacc)
pitch_rad = math.atan2(-x, math.sqrt(y * y + z * z))
bank_rad = math.atan2(y, z)
# The atan2(y, z) convention puts level flight at ±π (since z is
# negative gravity); we want level = 0, so subtract π and wrap.
bank_deg_raw = math.degrees(bank_rad)
if bank_deg_raw > 90.0:
bank_deg = bank_deg_raw - 180.0
elif bank_deg_raw < -90.0:
bank_deg = bank_deg_raw + 180.0
else:
bank_deg = bank_deg_raw
return FrameAttitude(bank_deg=bank_deg, pitch_deg=math.degrees(pitch_rad))
def compute_translation_m(row: ImuTelemetryRow, prev_row: ImuTelemetryRow | None) -> float:
"""Ground-plane translation between consecutive frames in meters.
Uses the GLOBAL_POSITION_INT velocity (vx, vy in cm/s); vz is
excluded because vertical motion mostly affects scale, not overlap.
Per-frame dt = 1/30 s. With telemetry at 10 Hz, the same velocity
drives 3 consecutive frames.
"""
vx_ms = row.vx_cms / 100.0
vy_ms = row.vy_cms / 100.0
horizontal_speed = math.hypot(vx_ms, vy_ms)
dt_s = 1.0 / VIDEO_FPS
return horizontal_speed * dt_s
def compute_overlap_fraction(
translation_m: float, ground_footprint_m: float
) -> float:
"""Fraction of ground footprint that overlaps with the prior frame.
Approximation: assume a square ground footprint of side
``ground_footprint_m``. After translating by ``translation_m`` in
the horizontal plane, the overlap is
``max(0, 1 - translation_m / ground_footprint_m)``.
This is an upper bound — diagonal motion or rotation eats more
overlap. The ±10° attitude gate rules out the rotation-heavy
frames; pure translation is what survives, and this approximation
is tight for cruise flight.
"""
if ground_footprint_m <= 0:
raise ValueError(f"ground_footprint_m must be > 0, got {ground_footprint_m}")
fraction = 1.0 - translation_m / ground_footprint_m
return max(0.0, min(1.0, fraction))
def classify_frames(
imu_rows: Sequence[ImuTelemetryRow],
*,
attitude_limit_deg: float = ATTITUDE_LIMIT_DEG,
min_overlap_fraction: float = TARGET_OVERLAP_FRACTION,
ground_footprint_m: float = DEFAULT_GROUND_FOOTPRINT_M,
video_frames_per_imu_row: int = VIDEO_FRAMES_PER_IMU_ROW,
) -> list[FrameClassification]:
"""Build the per-video-frame classification list.
Each ``ImuTelemetryRow`` drives ``video_frames_per_imu_row``
consecutive video frames (3 frames per IMU row by default). Frame
indices are 0-based and contiguous.
Determinism: this function depends only on the input rows + tunables
— same input → same output.
"""
if attitude_limit_deg <= 0:
raise ValueError(f"attitude_limit_deg must be > 0, got {attitude_limit_deg}")
if not 0.0 < min_overlap_fraction < 1.0:
raise ValueError(
f"min_overlap_fraction must be in (0, 1), got {min_overlap_fraction}"
)
classifications: list[FrameClassification] = []
prev_row: ImuTelemetryRow | None = None
frame_index = 0
for imu_row_index, row in enumerate(imu_rows):
attitude = compute_attitude(row)
translation_m = compute_translation_m(row, prev_row)
overlap_fraction = compute_overlap_fraction(translation_m, ground_footprint_m)
attitude_ok = (
abs(attitude.bank_deg) <= attitude_limit_deg
and abs(attitude.pitch_deg) <= attitude_limit_deg
)
overlap_ok = overlap_fraction >= min_overlap_fraction
is_normal = attitude_ok and overlap_ok
if not attitude_ok:
reason = "attitude_exceeds_limit"
elif not overlap_ok:
reason = "overlap_below_threshold"
else:
reason = ""
for _ in range(video_frames_per_imu_row):
classifications.append(
FrameClassification(
frame_index=frame_index,
imu_row_index=imu_row_index,
attitude=attitude,
translation_m=translation_m,
overlap_fraction=overlap_fraction,
is_normal=is_normal,
excluded_reason=reason,
)
)
frame_index += 1
prev_row = row
return classifications
def compute_success_ratio(
classifications: Sequence[FrameClassification],
registration_success_by_frame: Mapping[int, bool],
) -> SuccessReport:
"""Compute the success ratio over the normal-frame denominator.
Parameters
----------
classifications : the per-frame classification list (output of
``classify_frames``).
registration_success_by_frame : dict[frame_index, bool] — populated
from ``NAMED_VALUE_FLOAT`` listener output or post-run FDR read.
Frames missing from this dict are excluded from the denominator
and counted in ``excluded_by_missing_metric`` (the SUT failed to
emit the metric — AC-2 of the AC-NEW-3 FDR-schema spec covers
the schema; this scenario complains if the metric is absent).
Returns
-------
SuccessReport
"""
success = 0
denominator = 0
excluded_by_attitude = 0
excluded_by_overlap = 0
excluded_by_missing_metric = 0
for cls in classifications:
if not cls.is_normal:
if cls.excluded_reason == "attitude_exceeds_limit":
excluded_by_attitude += 1
elif cls.excluded_reason == "overlap_below_threshold":
excluded_by_overlap += 1
continue
metric = registration_success_by_frame.get(cls.frame_index)
if metric is None:
excluded_by_missing_metric += 1
continue
denominator += 1
if metric:
success += 1
excluded_count = excluded_by_attitude + excluded_by_overlap + excluded_by_missing_metric
ratio = (success / denominator) if denominator > 0 else 0.0
return SuccessReport(
success_count=success,
denominator=denominator,
ratio=ratio,
excluded_count=excluded_count,
excluded_by_attitude=excluded_by_attitude,
excluded_by_overlap=excluded_by_overlap,
excluded_by_missing_metric=excluded_by_missing_metric,
)
def write_csv_evidence(
out_path: Path,
classifications: Iterable[FrameClassification],
registration_success_by_frame: Mapping[int, bool],
) -> Path:
"""Write the FT-P-04 per-frame evidence CSV.
Header: ``frame_index, imu_row_index, bank_deg, pitch_deg,
translation_m, overlap_fraction, is_normal, excluded_reason,
registration_success``.
"""
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"frame_index",
"imu_row_index",
"bank_deg",
"pitch_deg",
"translation_m",
"overlap_fraction",
"is_normal",
"excluded_reason",
"registration_success",
]
)
for cls in classifications:
metric = registration_success_by_frame.get(cls.frame_index)
writer.writerow(
[
cls.frame_index,
cls.imu_row_index,
f"{cls.attitude.bank_deg:.3f}",
f"{cls.attitude.pitch_deg:.3f}",
f"{cls.translation_m:.4f}",
f"{cls.overlap_fraction:.4f}",
"true" if cls.is_normal else "false",
cls.excluded_reason,
"" if metric is None else ("true" if metric else "false"),
]
)
return out_path