"""Sharp-turn detection + FT-P-07 / FT-N-02 evaluation (AZ-414 / AC-3.2). The Derkachi telemetry CSV records ``SCALED_IMU2.{xgyro, ygyro, zgyro}`` in milli-degree/s. A "sharp turn" segment is identified by sustained high yaw rate (``|zgyro|``) — ≥``MIN_RUN_LENGTH`` consecutive IMU rows above ``DEFAULT_GYRO_Z_THRESHOLD_MDPS``. The threshold is read from ``AC32_SHARP_TURN_GYRO_Z_MDPS`` env var if set (AZ-414 spec note — "reads from the test-spec environment, not from a hardcoded constant"). FT-N-02 (during turn): * AC-2: source_label ∈ {visual_propagated, dead_reckoned}. * AC-3: cov_semi_major_m non-decreasing across consecutive frames inside the turn. FT-P-07 (recovery): * AC-4: next satellite_anchored emission within ≤3 frames after turn end. * AC-5: ``|propagated_centre_at_turn_end − recovery_anchor_centre| ≤ 200 m``. * AC-6: heading delta from pre-turn anchor to post-turn anchor handled in the [0°, 70°] envelope (recovery still occurs within the 3-frame budget). Synthetic-overlay fallback: if no natural Derkachi segment meets the threshold, the FT-P-07 / FT-N-02 scenarios fall back to a synthetic gyro overlay. The fallback decision is recorded in the FDR / CSV ``evidence_paths`` per the spec (this helper exposes a flag; the scenario writes the marker). Public-boundary discipline: this module does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations import csv import math import os from dataclasses import dataclass, field from pathlib import Path from typing import Sequence from .geo import distance_m DEFAULT_GYRO_Z_THRESHOLD_MDPS = 30_000 # 30 °/s sustained yaw MIN_RUN_LENGTH = 3 # consecutive IMU rows above threshold SHARP_TURN_ENV_VAR = "AC32_SHARP_TURN_GYRO_Z_MDPS" MAX_RECOVERY_FRAMES = 3 MAX_RECOVERY_FRAMES_SAFETY_MS = 1100 # 3 frames @ ~3 fps, +100 ms slack MAX_RECOVERY_DRIFT_M = 200.0 MAX_HEADING_CHANGE_DEG = 70.0 ALLOWED_DURING_TURN_LABELS = {"visual_propagated", "dead_reckoned"} @dataclass(frozen=True) class GyroSample: """One IMU row's yaw-rate sample (z axis) — millidegree/s.""" monotonic_ms: int time_s: float zgyro_mdps: int @dataclass(frozen=True) class TurnSegment: """A contiguous run of ≥MIN_RUN_LENGTH samples above the gyro threshold.""" start_index: int end_index: int # inclusive — the last sample still above threshold start_ms: int end_ms: int peak_abs_zgyro_mdps: int sample_count: int @property def duration_ms(self) -> int: return self.end_ms - self.start_ms @dataclass(frozen=True) class TurnDetection: """Per-flight detection: zero or more segments + synthetic-overlay flag.""" segments: tuple[TurnSegment, ...] threshold_mdps: int synthetic_overlay: bool = False @property def has_natural_turn(self) -> bool: return len(self.segments) > 0 and not self.synthetic_overlay @dataclass(frozen=True) class TurnFrameSample: """One outbound estimate observed during replay. Same shape as ``multi_segment_evaluator.EstimateSample`` but with the additional ``cov_semi_major_m`` channel FT-N-02 AC-3 needs. """ monotonic_ms: int lat_deg: float lon_deg: float source_label: str cov_semi_major_m: float @dataclass(frozen=True) class FtN02WindowReport: """FT-N-02 per-window report (during turn).""" segment_index: int samples_inside: int label_violations: tuple[str, ...] cov_non_decreasing: bool first_decreasing_at_ms: int | None @property def passes_label(self) -> bool: return self.samples_inside > 0 and not self.label_violations @property def passes_cov(self) -> bool: return self.cov_non_decreasing @property def passes(self) -> bool: return self.passes_label and self.passes_cov @dataclass(frozen=True) class FtP07WindowReport: """FT-P-07 per-window report (recovery).""" segment_index: int recovery_anchor_ms: int | None recovery_lag_ms: int | None drift_m: float | None heading_change_deg: float | None in_heading_envelope: bool @property def passes_recovery_lag(self) -> bool: return ( self.recovery_lag_ms is not None and self.recovery_lag_ms <= MAX_RECOVERY_FRAMES_SAFETY_MS ) @property def passes_drift(self) -> bool: return self.drift_m is not None and self.drift_m <= MAX_RECOVERY_DRIFT_M @property def passes_heading(self) -> bool: return self.in_heading_envelope @property def passes(self) -> bool: return self.passes_recovery_lag and self.passes_drift and self.passes_heading def get_threshold_mdps() -> int: """Read the AC-3.2 threshold from env, defaulting to the project value.""" raw = os.environ.get(SHARP_TURN_ENV_VAR) if raw is None or raw.strip() == "": return DEFAULT_GYRO_Z_THRESHOLD_MDPS try: value = int(raw) except ValueError: raise ValueError( f"{SHARP_TURN_ENV_VAR}={raw!r} is not a valid int (millidegree/s)" ) if value <= 0: raise ValueError(f"{SHARP_TURN_ENV_VAR}={raw!r} must be > 0") return value def load_zgyro_samples(csv_path: Path) -> list[GyroSample]: """Read ``data_imu.csv`` and return per-row yaw-rate samples.""" if not csv_path.exists(): raise FileNotFoundError( f"data_imu.csv not found at {csv_path} — bind-mount the Derkachi fixture" ) samples: list[GyroSample] = [] with csv_path.open() as fh: reader = csv.DictReader(fh) if "SCALED_IMU2.zgyro" not in (reader.fieldnames or []): raise ValueError( "data_imu.csv missing required column SCALED_IMU2.zgyro" ) for row in reader: ts = row.get("timestamp(ms)", "").strip() if not ts: continue samples.append( GyroSample( monotonic_ms=int(float(ts)), time_s=float(row["Time"]), zgyro_mdps=int(float(row["SCALED_IMU2.zgyro"])), ) ) return samples def detect_turn_segments( samples: Sequence[GyroSample], *, threshold_mdps: int | None = None, min_run_length: int = MIN_RUN_LENGTH, ) -> TurnDetection: """Find contiguous runs of |zgyro| ≥ threshold lasting ≥min_run_length samples.""" if min_run_length < 1: raise ValueError(f"min_run_length must be ≥1, got {min_run_length}") threshold = threshold_mdps if threshold_mdps is not None else get_threshold_mdps() segments: list[TurnSegment] = [] run_start: int | None = None run_peak = 0 for i, s in enumerate(samples): abs_z = abs(s.zgyro_mdps) if abs_z >= threshold: if run_start is None: run_start = i run_peak = abs_z else: run_peak = max(run_peak, abs_z) else: if run_start is not None: run_len = i - run_start if run_len >= min_run_length: seg_end = i - 1 segments.append( TurnSegment( start_index=run_start, end_index=seg_end, start_ms=samples[run_start].monotonic_ms, end_ms=samples[seg_end].monotonic_ms, peak_abs_zgyro_mdps=run_peak, sample_count=run_len, ) ) run_start = None run_peak = 0 # Tail run. if run_start is not None: run_len = len(samples) - run_start if run_len >= min_run_length: seg_end = len(samples) - 1 segments.append( TurnSegment( start_index=run_start, end_index=seg_end, start_ms=samples[run_start].monotonic_ms, end_ms=samples[seg_end].monotonic_ms, peak_abs_zgyro_mdps=run_peak, sample_count=run_len, ) ) return TurnDetection( segments=tuple(segments), threshold_mdps=threshold, synthetic_overlay=False, ) def synthesize_overlay_segment( samples: Sequence[GyroSample], *, threshold_mdps: int, anchor_fraction: float = 0.5, duration_samples: int = MIN_RUN_LENGTH * 2, ) -> TurnDetection: """Construct a synthetic-overlay turn anchored at ``anchor_fraction`` of the flight. The scenario only needs the segment's time bounds to evaluate the SUT's behaviour — the overlay does NOT modify the IMU stream (that's an injector concern). The detection result is marked ``synthetic_overlay=True`` so the scenario writes the flag into the evidence CSV. """ if not samples: raise ValueError("samples must not be empty") if not 0.0 <= anchor_fraction <= 1.0: raise ValueError(f"anchor_fraction must be in [0, 1], got {anchor_fraction}") if duration_samples < MIN_RUN_LENGTH: raise ValueError( f"duration_samples must be ≥{MIN_RUN_LENGTH}, got {duration_samples}" ) anchor_idx = int(anchor_fraction * (len(samples) - 1)) end_idx = min(len(samples) - 1, anchor_idx + duration_samples - 1) seg = TurnSegment( start_index=anchor_idx, end_index=end_idx, start_ms=samples[anchor_idx].monotonic_ms, end_ms=samples[end_idx].monotonic_ms, peak_abs_zgyro_mdps=threshold_mdps, # synthesised; not natural sample_count=end_idx - anchor_idx + 1, ) return TurnDetection( segments=(seg,), threshold_mdps=threshold_mdps, synthetic_overlay=True, ) def detect_or_synthesize(csv_path: Path) -> TurnDetection: """Convenience: detect a natural turn; if none, synthesise an overlay. The scenario uses this to keep the FT-P-07 / FT-N-02 paths covered on the natural fixture even when the data doesn't naturally include a sharp-turn segment. """ samples = load_zgyro_samples(csv_path) natural = detect_turn_segments(samples) if natural.has_natural_turn: return natural return synthesize_overlay_segment(samples, threshold_mdps=natural.threshold_mdps) def _heading_change_deg( pre_lat: float, pre_lon: float, mid_lat: float, mid_lon: float, post_lat: float, post_lon: float, ) -> float: """Heading delta from (pre→mid) to (mid→post), wrapped to [0, 180].""" def bearing(a_lat: float, a_lon: float, b_lat: float, b_lon: float) -> float: from .geo import delta return delta(a_lat, a_lon, b_lat, b_lon).forward_bearing_deg pre_bearing = bearing(pre_lat, pre_lon, mid_lat, mid_lon) post_bearing = bearing(mid_lat, mid_lon, post_lat, post_lon) diff = (post_bearing - pre_bearing) % 360.0 if diff > 180.0: diff = 360.0 - diff return diff def evaluate_ft_n_02( segment: TurnSegment, segment_index: int, samples: Sequence[TurnFrameSample], ) -> FtN02WindowReport: """FT-N-02 per-window evaluation (during-turn label + monotonic covariance).""" inside = [s for s in samples if segment.start_ms <= s.monotonic_ms <= segment.end_ms] label_violations = tuple( sorted({s.source_label for s in inside if s.source_label not in ALLOWED_DURING_TURN_LABELS}) ) cov_non_decreasing = True first_decreasing: int | None = None prev_cov = -math.inf for s in inside: if s.cov_semi_major_m < prev_cov: cov_non_decreasing = False first_decreasing = s.monotonic_ms break prev_cov = s.cov_semi_major_m return FtN02WindowReport( segment_index=segment_index, samples_inside=len(inside), label_violations=label_violations, cov_non_decreasing=cov_non_decreasing, first_decreasing_at_ms=first_decreasing, ) def evaluate_ft_p_07( segment: TurnSegment, segment_index: int, samples: Sequence[TurnFrameSample], ) -> FtP07WindowReport: """FT-P-07 per-window evaluation (recovery anchor, drift, heading).""" # Pre-turn anchor: last satellite_anchored at or before start_ms. pre_anchor: TurnFrameSample | None = None for s in samples: if s.monotonic_ms <= segment.start_ms and s.source_label == "satellite_anchored": pre_anchor = s elif s.monotonic_ms > segment.start_ms: break # Last inside-window sample (propagated centre at turn end). inside = [s for s in samples if segment.start_ms <= s.monotonic_ms <= segment.end_ms] propagated_at_end = inside[-1] if inside else None # Recovery: first satellite_anchored after end_ms. recovery: TurnFrameSample | None = None for s in samples: if s.monotonic_ms > segment.end_ms and s.source_label == "satellite_anchored": recovery = s break if recovery is None: return FtP07WindowReport( segment_index=segment_index, recovery_anchor_ms=None, recovery_lag_ms=None, drift_m=None, heading_change_deg=None, in_heading_envelope=False, ) recovery_lag = recovery.monotonic_ms - segment.end_ms drift: float | None = None if propagated_at_end is not None: drift = distance_m( propagated_at_end.lat_deg, propagated_at_end.lon_deg, recovery.lat_deg, recovery.lon_deg, ) heading_change: float | None = None in_envelope = True if pre_anchor is not None and propagated_at_end is not None: heading_change = _heading_change_deg( pre_anchor.lat_deg, pre_anchor.lon_deg, propagated_at_end.lat_deg, propagated_at_end.lon_deg, recovery.lat_deg, recovery.lon_deg, ) in_envelope = heading_change <= MAX_HEADING_CHANGE_DEG return FtP07WindowReport( segment_index=segment_index, recovery_anchor_ms=recovery.monotonic_ms, recovery_lag_ms=recovery_lag, drift_m=drift, heading_change_deg=heading_change, in_heading_envelope=in_envelope, ) def write_csv_evidence( out_path: Path, detection: TurnDetection, n02_reports: Sequence[FtN02WindowReport], p07_reports: Sequence[FtP07WindowReport], ) -> Path: """Write FT-P-07 + FT-N-02 combined evidence CSV. The ``synthetic_overlay`` column marks the fallback case per AC-1. """ out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "segment_index", "start_ms", "end_ms", "peak_abs_zgyro_mdps", "synthetic_overlay", # FT-N-02 columns "samples_inside", "label_violations", "cov_non_decreasing", # FT-P-07 columns "recovery_lag_ms", "drift_m", "heading_change_deg", "in_heading_envelope", "passes_ft_n_02", "passes_ft_p_07", ] ) for seg, n02, p07 in zip(detection.segments, n02_reports, p07_reports): writer.writerow( [ n02.segment_index, seg.start_ms, seg.end_ms, seg.peak_abs_zgyro_mdps, "true" if detection.synthetic_overlay else "false", n02.samples_inside, "|".join(n02.label_violations) if n02.label_violations else "", "true" if n02.cov_non_decreasing else "false", "" if p07.recovery_lag_ms is None else p07.recovery_lag_ms, "" if p07.drift_m is None else f"{p07.drift_m:.3f}", "" if p07.heading_change_deg is None else f"{p07.heading_change_deg:.3f}", "true" if p07.in_heading_envelope else "false", "true" if n02.passes else "false", "true" if p07.passes else "false", ] ) return out_path