mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 14:01:13 +00:00
29ac16cfcb
AZ-409 (3pt) — FT-P-01 still-image frame-center accuracy: - accuracy_evaluator.py: GT loader + Vincenty error + AC-2/AC-3 pass-counts - test_ft_p_01_still_image_accuracy.py: scenario gated on frame_source_replay + sitl_observer NotImplementedError; AC-4 timeout discipline AZ-412 (3pt) — FT-P-04 Derkachi f2f registration >=95% on normal segments: - registration_classifier.py: accel-derived attitude + overlap heuristic + success ratio with AC-3 sharp-turn exclusion - test_ft_p_04_derkachi_f2f_registration.py: scenario gated on frame_source_replay + imu_replay + fdr_reader AZ-413 (3pt) — FT-P-05 + FT-P-06 cross-domain MRE budgets: - mre_evaluator.py: per-image budget (strict <2.5px) + 95th-percentile via numpy linear interp + combined report - test_ft_p_05_sat_anchor.py: cross-domain scenario, reuses accuracy_evaluator for geodesic join - test_ft_p_06_mre_budgets.py: pure piggyback on FT-P-04 + FT-P-05 CSV evidence; skips when either upstream CSV missing Tests: 325 unit tests pass (+77 vs batch 69). Reports: batch_70_report.md, batch_70_review.md (PASS). Co-authored-by: Cursor <cursoragent@cursor.com>
383 lines
14 KiB
Python
383 lines
14 KiB
Python
"""Normal-segment classification + success-ratio for FT-P-04 (AZ-412 / AC-2.1a).
|
|
|
|
The SUT exposes a per-frame ``registration_success`` boolean (either via
|
|
``NAMED_VALUE_FLOAT`` MAVLink messages or via the post-run FDR archive).
|
|
This helper:
|
|
|
|
1. Reads the Derkachi ``data_imu.csv`` (SCALED_IMU2 + GLOBAL_POSITION_INT
|
|
columns) and derives a per-row attitude approximation from accelerometer
|
|
readings (the spec's AC-1 explicitly says attitude is
|
|
``SCALED_IMU2``-derived, NOT internal SUT state).
|
|
2. Classifies each video frame as **normal** when both:
|
|
* bank/pitch within ±10° of nadir, AND
|
|
* inferred prior-frame overlap ≥40 % (heuristic from translation magnitude).
|
|
3. Computes the success ratio over the **normal** set only — sharp-turn
|
|
frames are excluded from the denominator per AC-3.
|
|
4. Asserts the ratio meets the AC-2 budget (≥0.95).
|
|
|
|
The video is 30 fps; the IMU/telemetry CSV is 10 Hz (one row per 100 ms,
|
|
i.e. 3 video frames per row). The classifier expands each telemetry row
|
|
to 3 video-frame indices (the same row drives 3 consecutive frames).
|
|
|
|
Public-boundary discipline: this module does NOT import any
|
|
``src/gps_denied_onboard`` symbol.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import math
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Iterable, Mapping, Sequence
|
|
|
|
ATTITUDE_LIMIT_DEG = 10.0
|
|
TARGET_OVERLAP_FRACTION = 0.40
|
|
SUCCESS_RATIO_REQUIRED = 0.95
|
|
VIDEO_FPS = 30
|
|
IMU_HZ = 10
|
|
VIDEO_FRAMES_PER_IMU_ROW = VIDEO_FPS // IMU_HZ
|
|
# Derkachi nadir camera: the camera_info.md fixture records ~141 m altitude
|
|
# AGL and ~55° horizontal FOV. The "ground footprint width" at nadir is
|
|
# 2 * alt * tan(FOV/2) ≈ 2 * 141 * tan(27.5°) ≈ 147 m. We use a single
|
|
# scenario-wide ground footprint to keep the heuristic transparent.
|
|
DEFAULT_GROUND_FOOTPRINT_M = 147.0
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ImuTelemetryRow:
|
|
"""One row of ``data_imu.csv`` distilled to the columns the classifier needs.
|
|
|
|
Velocity fields are floats (cm/s) because the shipped ``data_imu.csv``
|
|
stores them in scientific notation (e.g. ``-4.44E-16`` near hover).
|
|
Acceleration fields stay int per the SCALED_IMU2 wire format.
|
|
"""
|
|
|
|
timestamp_ms: float
|
|
time_s: float
|
|
xacc: int
|
|
yacc: int
|
|
zacc: int
|
|
vx_cms: float
|
|
vy_cms: float
|
|
vz_cms: float
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FrameAttitude:
|
|
"""Bank + pitch derived from accel; used for the ±10° gate.
|
|
|
|
Accel-only attitude assumes the platform is in near-equilibrium
|
|
flight (the dominant accel is gravity). Valid for the cropped
|
|
nadir-cruise segments AC-2.1a targets; explicitly NOT valid during
|
|
aggressive manoeuvres — but those are exactly the frames AC-2.1a
|
|
wants to EXCLUDE from the denominator. So the limitation matches
|
|
the AC intent.
|
|
"""
|
|
|
|
bank_deg: float
|
|
pitch_deg: float
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FrameClassification:
|
|
"""Per-video-frame classification used to build the FT-P-04 denominator."""
|
|
|
|
frame_index: int
|
|
imu_row_index: int
|
|
attitude: FrameAttitude
|
|
translation_m: float
|
|
overlap_fraction: float
|
|
is_normal: bool
|
|
excluded_reason: str = ""
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SuccessReport:
|
|
"""Aggregate report consumed by the scenario assertion.
|
|
|
|
``ratio = success_count / denominator`` where ``denominator`` is the
|
|
count of normal frames (sharp-turn / low-overlap frames are excluded
|
|
per AC-3 — they are counted in ``excluded_count`` for diagnostic
|
|
clarity).
|
|
"""
|
|
|
|
success_count: int
|
|
denominator: int
|
|
ratio: float
|
|
excluded_count: int
|
|
excluded_by_attitude: int
|
|
excluded_by_overlap: int
|
|
excluded_by_missing_metric: int
|
|
ratio_required: float = SUCCESS_RATIO_REQUIRED
|
|
|
|
@property
|
|
def passes(self) -> bool:
|
|
return self.denominator > 0 and self.ratio >= self.ratio_required
|
|
|
|
|
|
def load_imu_telemetry(csv_path: Path) -> list[ImuTelemetryRow]:
|
|
"""Read ``data_imu.csv`` and return one row per non-blank entry.
|
|
|
|
Only the columns the classifier needs are kept. Other columns are
|
|
ignored to keep the classifier independent of upstream column churn.
|
|
"""
|
|
if not csv_path.exists():
|
|
raise FileNotFoundError(
|
|
f"data_imu.csv not found at {csv_path} — bind-mount the Derkachi fixture"
|
|
)
|
|
needed = {
|
|
"timestamp(ms)",
|
|
"Time",
|
|
"SCALED_IMU2.xacc",
|
|
"SCALED_IMU2.yacc",
|
|
"SCALED_IMU2.zacc",
|
|
"GLOBAL_POSITION_INT.vx",
|
|
"GLOBAL_POSITION_INT.vy",
|
|
"GLOBAL_POSITION_INT.vz",
|
|
}
|
|
rows: list[ImuTelemetryRow] = []
|
|
with csv_path.open() as fh:
|
|
reader = csv.DictReader(fh)
|
|
missing = needed - set(reader.fieldnames or [])
|
|
if missing:
|
|
raise ValueError(f"data_imu.csv missing columns: {sorted(missing)}")
|
|
for raw in reader:
|
|
if not raw["timestamp(ms)"].strip():
|
|
continue
|
|
rows.append(
|
|
ImuTelemetryRow(
|
|
timestamp_ms=float(raw["timestamp(ms)"]),
|
|
time_s=float(raw["Time"]),
|
|
xacc=int(float(raw["SCALED_IMU2.xacc"])),
|
|
yacc=int(float(raw["SCALED_IMU2.yacc"])),
|
|
zacc=int(float(raw["SCALED_IMU2.zacc"])),
|
|
vx_cms=float(raw["GLOBAL_POSITION_INT.vx"]),
|
|
vy_cms=float(raw["GLOBAL_POSITION_INT.vy"]),
|
|
vz_cms=float(raw["GLOBAL_POSITION_INT.vz"]),
|
|
)
|
|
)
|
|
return rows
|
|
|
|
|
|
def compute_attitude(row: ImuTelemetryRow) -> FrameAttitude:
|
|
"""Derive bank + pitch from accelerometer (gravity-as-down assumption).
|
|
|
|
SCALED_IMU2 acc components are in mg (milli-g). Sign convention:
|
|
body-frame +x forward, +y right, +z down. With dominant gravity on
|
|
+z the resting attitude has xacc=0, yacc=0, zacc=-1000 (negative
|
|
because the body frame measures the reaction force pointing UP).
|
|
|
|
pitch = atan2(-xacc, sqrt(yacc² + zacc²))
|
|
bank = atan2(yacc, zacc)
|
|
"""
|
|
x = float(row.xacc)
|
|
y = float(row.yacc)
|
|
z = float(row.zacc)
|
|
pitch_rad = math.atan2(-x, math.sqrt(y * y + z * z))
|
|
bank_rad = math.atan2(y, z)
|
|
# The atan2(y, z) convention puts level flight at ±π (since z is
|
|
# negative gravity); we want level = 0, so subtract π and wrap.
|
|
bank_deg_raw = math.degrees(bank_rad)
|
|
if bank_deg_raw > 90.0:
|
|
bank_deg = bank_deg_raw - 180.0
|
|
elif bank_deg_raw < -90.0:
|
|
bank_deg = bank_deg_raw + 180.0
|
|
else:
|
|
bank_deg = bank_deg_raw
|
|
return FrameAttitude(bank_deg=bank_deg, pitch_deg=math.degrees(pitch_rad))
|
|
|
|
|
|
def compute_translation_m(row: ImuTelemetryRow, prev_row: ImuTelemetryRow | None) -> float:
|
|
"""Ground-plane translation between consecutive frames in meters.
|
|
|
|
Uses the GLOBAL_POSITION_INT velocity (vx, vy in cm/s); vz is
|
|
excluded because vertical motion mostly affects scale, not overlap.
|
|
Per-frame dt = 1/30 s. With telemetry at 10 Hz, the same velocity
|
|
drives 3 consecutive frames.
|
|
"""
|
|
vx_ms = row.vx_cms / 100.0
|
|
vy_ms = row.vy_cms / 100.0
|
|
horizontal_speed = math.hypot(vx_ms, vy_ms)
|
|
dt_s = 1.0 / VIDEO_FPS
|
|
return horizontal_speed * dt_s
|
|
|
|
|
|
def compute_overlap_fraction(
|
|
translation_m: float, ground_footprint_m: float
|
|
) -> float:
|
|
"""Fraction of ground footprint that overlaps with the prior frame.
|
|
|
|
Approximation: assume a square ground footprint of side
|
|
``ground_footprint_m``. After translating by ``translation_m`` in
|
|
the horizontal plane, the overlap is
|
|
``max(0, 1 - translation_m / ground_footprint_m)``.
|
|
|
|
This is an upper bound — diagonal motion or rotation eats more
|
|
overlap. The ±10° attitude gate rules out the rotation-heavy
|
|
frames; pure translation is what survives, and this approximation
|
|
is tight for cruise flight.
|
|
"""
|
|
if ground_footprint_m <= 0:
|
|
raise ValueError(f"ground_footprint_m must be > 0, got {ground_footprint_m}")
|
|
fraction = 1.0 - translation_m / ground_footprint_m
|
|
return max(0.0, min(1.0, fraction))
|
|
|
|
|
|
def classify_frames(
|
|
imu_rows: Sequence[ImuTelemetryRow],
|
|
*,
|
|
attitude_limit_deg: float = ATTITUDE_LIMIT_DEG,
|
|
min_overlap_fraction: float = TARGET_OVERLAP_FRACTION,
|
|
ground_footprint_m: float = DEFAULT_GROUND_FOOTPRINT_M,
|
|
video_frames_per_imu_row: int = VIDEO_FRAMES_PER_IMU_ROW,
|
|
) -> list[FrameClassification]:
|
|
"""Build the per-video-frame classification list.
|
|
|
|
Each ``ImuTelemetryRow`` drives ``video_frames_per_imu_row``
|
|
consecutive video frames (3 frames per IMU row by default). Frame
|
|
indices are 0-based and contiguous.
|
|
|
|
Determinism: this function depends only on the input rows + tunables
|
|
— same input → same output.
|
|
"""
|
|
if attitude_limit_deg <= 0:
|
|
raise ValueError(f"attitude_limit_deg must be > 0, got {attitude_limit_deg}")
|
|
if not 0.0 < min_overlap_fraction < 1.0:
|
|
raise ValueError(
|
|
f"min_overlap_fraction must be in (0, 1), got {min_overlap_fraction}"
|
|
)
|
|
|
|
classifications: list[FrameClassification] = []
|
|
prev_row: ImuTelemetryRow | None = None
|
|
frame_index = 0
|
|
for imu_row_index, row in enumerate(imu_rows):
|
|
attitude = compute_attitude(row)
|
|
translation_m = compute_translation_m(row, prev_row)
|
|
overlap_fraction = compute_overlap_fraction(translation_m, ground_footprint_m)
|
|
attitude_ok = (
|
|
abs(attitude.bank_deg) <= attitude_limit_deg
|
|
and abs(attitude.pitch_deg) <= attitude_limit_deg
|
|
)
|
|
overlap_ok = overlap_fraction >= min_overlap_fraction
|
|
is_normal = attitude_ok and overlap_ok
|
|
if not attitude_ok:
|
|
reason = "attitude_exceeds_limit"
|
|
elif not overlap_ok:
|
|
reason = "overlap_below_threshold"
|
|
else:
|
|
reason = ""
|
|
for _ in range(video_frames_per_imu_row):
|
|
classifications.append(
|
|
FrameClassification(
|
|
frame_index=frame_index,
|
|
imu_row_index=imu_row_index,
|
|
attitude=attitude,
|
|
translation_m=translation_m,
|
|
overlap_fraction=overlap_fraction,
|
|
is_normal=is_normal,
|
|
excluded_reason=reason,
|
|
)
|
|
)
|
|
frame_index += 1
|
|
prev_row = row
|
|
return classifications
|
|
|
|
|
|
def compute_success_ratio(
|
|
classifications: Sequence[FrameClassification],
|
|
registration_success_by_frame: Mapping[int, bool],
|
|
) -> SuccessReport:
|
|
"""Compute the success ratio over the normal-frame denominator.
|
|
|
|
Parameters
|
|
----------
|
|
classifications : the per-frame classification list (output of
|
|
``classify_frames``).
|
|
registration_success_by_frame : dict[frame_index, bool] — populated
|
|
from ``NAMED_VALUE_FLOAT`` listener output or post-run FDR read.
|
|
Frames missing from this dict are excluded from the denominator
|
|
and counted in ``excluded_by_missing_metric`` (the SUT failed to
|
|
emit the metric — AC-2 of the AC-NEW-3 FDR-schema spec covers
|
|
the schema; this scenario complains if the metric is absent).
|
|
|
|
Returns
|
|
-------
|
|
SuccessReport
|
|
"""
|
|
success = 0
|
|
denominator = 0
|
|
excluded_by_attitude = 0
|
|
excluded_by_overlap = 0
|
|
excluded_by_missing_metric = 0
|
|
for cls in classifications:
|
|
if not cls.is_normal:
|
|
if cls.excluded_reason == "attitude_exceeds_limit":
|
|
excluded_by_attitude += 1
|
|
elif cls.excluded_reason == "overlap_below_threshold":
|
|
excluded_by_overlap += 1
|
|
continue
|
|
metric = registration_success_by_frame.get(cls.frame_index)
|
|
if metric is None:
|
|
excluded_by_missing_metric += 1
|
|
continue
|
|
denominator += 1
|
|
if metric:
|
|
success += 1
|
|
excluded_count = excluded_by_attitude + excluded_by_overlap + excluded_by_missing_metric
|
|
ratio = (success / denominator) if denominator > 0 else 0.0
|
|
return SuccessReport(
|
|
success_count=success,
|
|
denominator=denominator,
|
|
ratio=ratio,
|
|
excluded_count=excluded_count,
|
|
excluded_by_attitude=excluded_by_attitude,
|
|
excluded_by_overlap=excluded_by_overlap,
|
|
excluded_by_missing_metric=excluded_by_missing_metric,
|
|
)
|
|
|
|
|
|
def write_csv_evidence(
|
|
out_path: Path,
|
|
classifications: Iterable[FrameClassification],
|
|
registration_success_by_frame: Mapping[int, bool],
|
|
) -> Path:
|
|
"""Write the FT-P-04 per-frame evidence CSV.
|
|
|
|
Header: ``frame_index, imu_row_index, bank_deg, pitch_deg,
|
|
translation_m, overlap_fraction, is_normal, excluded_reason,
|
|
registration_success``.
|
|
"""
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with out_path.open("w", newline="") as fh:
|
|
writer = csv.writer(fh)
|
|
writer.writerow(
|
|
[
|
|
"frame_index",
|
|
"imu_row_index",
|
|
"bank_deg",
|
|
"pitch_deg",
|
|
"translation_m",
|
|
"overlap_fraction",
|
|
"is_normal",
|
|
"excluded_reason",
|
|
"registration_success",
|
|
]
|
|
)
|
|
for cls in classifications:
|
|
metric = registration_success_by_frame.get(cls.frame_index)
|
|
writer.writerow(
|
|
[
|
|
cls.frame_index,
|
|
cls.imu_row_index,
|
|
f"{cls.attitude.bank_deg:.3f}",
|
|
f"{cls.attitude.pitch_deg:.3f}",
|
|
f"{cls.translation_m:.4f}",
|
|
f"{cls.overlap_fraction:.4f}",
|
|
"true" if cls.is_normal else "false",
|
|
cls.excluded_reason,
|
|
"" if metric is None else ("true" if metric else "false"),
|
|
]
|
|
)
|
|
return out_path
|