"""Normal-segment classification + success-ratio for FT-P-04 (AZ-412 / AC-2.1a). The SUT exposes a per-frame ``registration_success`` boolean (either via ``NAMED_VALUE_FLOAT`` MAVLink messages or via the post-run FDR archive). This helper: 1. Reads the Derkachi ``data_imu.csv`` (SCALED_IMU2 + GLOBAL_POSITION_INT columns) and derives a per-row attitude approximation from accelerometer readings (the spec's AC-1 explicitly says attitude is ``SCALED_IMU2``-derived, NOT internal SUT state). 2. Classifies each video frame as **normal** when both: * bank/pitch within ±10° of nadir, AND * inferred prior-frame overlap ≥40 % (heuristic from translation magnitude). 3. Computes the success ratio over the **normal** set only — sharp-turn frames are excluded from the denominator per AC-3. 4. Asserts the ratio meets the AC-2 budget (≥0.95). The video is 30 fps; the IMU/telemetry CSV is 10 Hz (one row per 100 ms, i.e. 3 video frames per row). The classifier expands each telemetry row to 3 video-frame indices (the same row drives 3 consecutive frames). Public-boundary discipline: this module does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations import csv import math from dataclasses import dataclass, field from pathlib import Path from typing import Iterable, Mapping, Sequence ATTITUDE_LIMIT_DEG = 10.0 TARGET_OVERLAP_FRACTION = 0.40 SUCCESS_RATIO_REQUIRED = 0.95 VIDEO_FPS = 30 IMU_HZ = 10 VIDEO_FRAMES_PER_IMU_ROW = VIDEO_FPS // IMU_HZ # Derkachi nadir camera: the camera_info.md fixture records ~141 m altitude # AGL and ~55° horizontal FOV. The "ground footprint width" at nadir is # 2 * alt * tan(FOV/2) ≈ 2 * 141 * tan(27.5°) ≈ 147 m. We use a single # scenario-wide ground footprint to keep the heuristic transparent. DEFAULT_GROUND_FOOTPRINT_M = 147.0 @dataclass(frozen=True) class ImuTelemetryRow: """One row of ``data_imu.csv`` distilled to the columns the classifier needs. Velocity fields are floats (cm/s) because the shipped ``data_imu.csv`` stores them in scientific notation (e.g. ``-4.44E-16`` near hover). Acceleration fields stay int per the SCALED_IMU2 wire format. """ timestamp_ms: float time_s: float xacc: int yacc: int zacc: int vx_cms: float vy_cms: float vz_cms: float @dataclass(frozen=True) class FrameAttitude: """Bank + pitch derived from accel; used for the ±10° gate. Accel-only attitude assumes the platform is in near-equilibrium flight (the dominant accel is gravity). Valid for the cropped nadir-cruise segments AC-2.1a targets; explicitly NOT valid during aggressive manoeuvres — but those are exactly the frames AC-2.1a wants to EXCLUDE from the denominator. So the limitation matches the AC intent. """ bank_deg: float pitch_deg: float @dataclass(frozen=True) class FrameClassification: """Per-video-frame classification used to build the FT-P-04 denominator.""" frame_index: int imu_row_index: int attitude: FrameAttitude translation_m: float overlap_fraction: float is_normal: bool excluded_reason: str = "" @dataclass(frozen=True) class SuccessReport: """Aggregate report consumed by the scenario assertion. ``ratio = success_count / denominator`` where ``denominator`` is the count of normal frames (sharp-turn / low-overlap frames are excluded per AC-3 — they are counted in ``excluded_count`` for diagnostic clarity). """ success_count: int denominator: int ratio: float excluded_count: int excluded_by_attitude: int excluded_by_overlap: int excluded_by_missing_metric: int ratio_required: float = SUCCESS_RATIO_REQUIRED @property def passes(self) -> bool: return self.denominator > 0 and self.ratio >= self.ratio_required def load_imu_telemetry(csv_path: Path) -> list[ImuTelemetryRow]: """Read ``data_imu.csv`` and return one row per non-blank entry. Only the columns the classifier needs are kept. Other columns are ignored to keep the classifier independent of upstream column churn. """ if not csv_path.exists(): raise FileNotFoundError( f"data_imu.csv not found at {csv_path} — bind-mount the Derkachi fixture" ) needed = { "timestamp(ms)", "Time", "SCALED_IMU2.xacc", "SCALED_IMU2.yacc", "SCALED_IMU2.zacc", "GLOBAL_POSITION_INT.vx", "GLOBAL_POSITION_INT.vy", "GLOBAL_POSITION_INT.vz", } rows: list[ImuTelemetryRow] = [] with csv_path.open() as fh: reader = csv.DictReader(fh) missing = needed - set(reader.fieldnames or []) if missing: raise ValueError(f"data_imu.csv missing columns: {sorted(missing)}") for raw in reader: if not raw["timestamp(ms)"].strip(): continue rows.append( ImuTelemetryRow( timestamp_ms=float(raw["timestamp(ms)"]), time_s=float(raw["Time"]), xacc=int(float(raw["SCALED_IMU2.xacc"])), yacc=int(float(raw["SCALED_IMU2.yacc"])), zacc=int(float(raw["SCALED_IMU2.zacc"])), vx_cms=float(raw["GLOBAL_POSITION_INT.vx"]), vy_cms=float(raw["GLOBAL_POSITION_INT.vy"]), vz_cms=float(raw["GLOBAL_POSITION_INT.vz"]), ) ) return rows def compute_attitude(row: ImuTelemetryRow) -> FrameAttitude: """Derive bank + pitch from accelerometer (gravity-as-down assumption). SCALED_IMU2 acc components are in mg (milli-g). Sign convention: body-frame +x forward, +y right, +z down. With dominant gravity on +z the resting attitude has xacc=0, yacc=0, zacc=-1000 (negative because the body frame measures the reaction force pointing UP). pitch = atan2(-xacc, sqrt(yacc² + zacc²)) bank = atan2(yacc, zacc) """ x = float(row.xacc) y = float(row.yacc) z = float(row.zacc) pitch_rad = math.atan2(-x, math.sqrt(y * y + z * z)) bank_rad = math.atan2(y, z) # The atan2(y, z) convention puts level flight at ±π (since z is # negative gravity); we want level = 0, so subtract π and wrap. bank_deg_raw = math.degrees(bank_rad) if bank_deg_raw > 90.0: bank_deg = bank_deg_raw - 180.0 elif bank_deg_raw < -90.0: bank_deg = bank_deg_raw + 180.0 else: bank_deg = bank_deg_raw return FrameAttitude(bank_deg=bank_deg, pitch_deg=math.degrees(pitch_rad)) def compute_translation_m(row: ImuTelemetryRow, prev_row: ImuTelemetryRow | None) -> float: """Ground-plane translation between consecutive frames in meters. Uses the GLOBAL_POSITION_INT velocity (vx, vy in cm/s); vz is excluded because vertical motion mostly affects scale, not overlap. Per-frame dt = 1/30 s. With telemetry at 10 Hz, the same velocity drives 3 consecutive frames. """ vx_ms = row.vx_cms / 100.0 vy_ms = row.vy_cms / 100.0 horizontal_speed = math.hypot(vx_ms, vy_ms) dt_s = 1.0 / VIDEO_FPS return horizontal_speed * dt_s def compute_overlap_fraction( translation_m: float, ground_footprint_m: float ) -> float: """Fraction of ground footprint that overlaps with the prior frame. Approximation: assume a square ground footprint of side ``ground_footprint_m``. After translating by ``translation_m`` in the horizontal plane, the overlap is ``max(0, 1 - translation_m / ground_footprint_m)``. This is an upper bound — diagonal motion or rotation eats more overlap. The ±10° attitude gate rules out the rotation-heavy frames; pure translation is what survives, and this approximation is tight for cruise flight. """ if ground_footprint_m <= 0: raise ValueError(f"ground_footprint_m must be > 0, got {ground_footprint_m}") fraction = 1.0 - translation_m / ground_footprint_m return max(0.0, min(1.0, fraction)) def classify_frames( imu_rows: Sequence[ImuTelemetryRow], *, attitude_limit_deg: float = ATTITUDE_LIMIT_DEG, min_overlap_fraction: float = TARGET_OVERLAP_FRACTION, ground_footprint_m: float = DEFAULT_GROUND_FOOTPRINT_M, video_frames_per_imu_row: int = VIDEO_FRAMES_PER_IMU_ROW, ) -> list[FrameClassification]: """Build the per-video-frame classification list. Each ``ImuTelemetryRow`` drives ``video_frames_per_imu_row`` consecutive video frames (3 frames per IMU row by default). Frame indices are 0-based and contiguous. Determinism: this function depends only on the input rows + tunables — same input → same output. """ if attitude_limit_deg <= 0: raise ValueError(f"attitude_limit_deg must be > 0, got {attitude_limit_deg}") if not 0.0 < min_overlap_fraction < 1.0: raise ValueError( f"min_overlap_fraction must be in (0, 1), got {min_overlap_fraction}" ) classifications: list[FrameClassification] = [] prev_row: ImuTelemetryRow | None = None frame_index = 0 for imu_row_index, row in enumerate(imu_rows): attitude = compute_attitude(row) translation_m = compute_translation_m(row, prev_row) overlap_fraction = compute_overlap_fraction(translation_m, ground_footprint_m) attitude_ok = ( abs(attitude.bank_deg) <= attitude_limit_deg and abs(attitude.pitch_deg) <= attitude_limit_deg ) overlap_ok = overlap_fraction >= min_overlap_fraction is_normal = attitude_ok and overlap_ok if not attitude_ok: reason = "attitude_exceeds_limit" elif not overlap_ok: reason = "overlap_below_threshold" else: reason = "" for _ in range(video_frames_per_imu_row): classifications.append( FrameClassification( frame_index=frame_index, imu_row_index=imu_row_index, attitude=attitude, translation_m=translation_m, overlap_fraction=overlap_fraction, is_normal=is_normal, excluded_reason=reason, ) ) frame_index += 1 prev_row = row return classifications def compute_success_ratio( classifications: Sequence[FrameClassification], registration_success_by_frame: Mapping[int, bool], ) -> SuccessReport: """Compute the success ratio over the normal-frame denominator. Parameters ---------- classifications : the per-frame classification list (output of ``classify_frames``). registration_success_by_frame : dict[frame_index, bool] — populated from ``NAMED_VALUE_FLOAT`` listener output or post-run FDR read. Frames missing from this dict are excluded from the denominator and counted in ``excluded_by_missing_metric`` (the SUT failed to emit the metric — AC-2 of the AC-NEW-3 FDR-schema spec covers the schema; this scenario complains if the metric is absent). Returns ------- SuccessReport """ success = 0 denominator = 0 excluded_by_attitude = 0 excluded_by_overlap = 0 excluded_by_missing_metric = 0 for cls in classifications: if not cls.is_normal: if cls.excluded_reason == "attitude_exceeds_limit": excluded_by_attitude += 1 elif cls.excluded_reason == "overlap_below_threshold": excluded_by_overlap += 1 continue metric = registration_success_by_frame.get(cls.frame_index) if metric is None: excluded_by_missing_metric += 1 continue denominator += 1 if metric: success += 1 excluded_count = excluded_by_attitude + excluded_by_overlap + excluded_by_missing_metric ratio = (success / denominator) if denominator > 0 else 0.0 return SuccessReport( success_count=success, denominator=denominator, ratio=ratio, excluded_count=excluded_count, excluded_by_attitude=excluded_by_attitude, excluded_by_overlap=excluded_by_overlap, excluded_by_missing_metric=excluded_by_missing_metric, ) def write_csv_evidence( out_path: Path, classifications: Iterable[FrameClassification], registration_success_by_frame: Mapping[int, bool], ) -> Path: """Write the FT-P-04 per-frame evidence CSV. Header: ``frame_index, imu_row_index, bank_deg, pitch_deg, translation_m, overlap_fraction, is_normal, excluded_reason, registration_success``. """ out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "frame_index", "imu_row_index", "bank_deg", "pitch_deg", "translation_m", "overlap_fraction", "is_normal", "excluded_reason", "registration_success", ] ) for cls in classifications: metric = registration_success_by_frame.get(cls.frame_index) writer.writerow( [ cls.frame_index, cls.imu_row_index, f"{cls.attitude.bank_deg:.3f}", f"{cls.attitude.pitch_deg:.3f}", f"{cls.translation_m:.4f}", f"{cls.overlap_fraction:.4f}", "true" if cls.is_normal else "false", cls.excluded_reason, "" if metric is None else ("true" if metric else "false"), ] ) return out_path