"""AZ-894 — CSV-driven IMU + GPS ground-truth extractor. Covers AC-1 (parses 4,899 IMU + 4,899 GPS samples on a single monotonic clock) and AC-5 (clear ``ReplayInputAdapterError`` at startup for schema faults) of ``_docs/02_tasks/todo/AZ-894_csv_driven_replay_adapter.md``. The happy-path test is gated on the committed Derkachi fixture (``_docs/00_problem/input_data/flight_derkachi/data_imu.csv``, 4,899 rows + header). Schema-fault tests use synthetic CSV strings written to ``tmp_path`` so they remain deterministic and do not depend on the fixture being present. Style: every test follows the Arrange / Act / Assert pattern. """ from __future__ import annotations from pathlib import Path import pytest from gps_denied_onboard.replay_input.csv_ground_truth import ( CSV_SOURCE_LABEL, REQUIRED_COLUMNS, load_csv_ground_truth, ) from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError _DERKACHI_CSV: Path = ( Path(__file__).resolve().parents[3] / "_docs" / "00_problem" / "input_data" / "flight_derkachi" / "data_imu.csv" ) _EXAMPLE_CSV: Path = ( Path(__file__).resolve().parents[3] / "_docs" / "02_document" / "contracts" / "replay" / "example_data_imu.csv" ) # --------------------------------------------------------------------- # Header + minimal-row helpers def _write_csv(path: Path, header: str, rows: list[str]) -> Path: path.write_text(header + "\n" + "\n".join(rows) + "\n", encoding="utf-8") return path def _full_header() -> str: return ",".join(REQUIRED_COLUMNS) def _row(time_s: float, *, prefix_ms: float = 0.0) -> str: # 15 fields total matching REQUIRED_COLUMNS ordering. Values are # picked to be valid floats; the exact magnitudes do not matter # for these tests (the loader only validates parseability + range). fields = [ str(prefix_ms), str(time_s), "10", "-3", "-980", "50", "30", "-5", "50.0809634", "36.1115442", "141290", "-4", "-6", "-88", "35041", ] return ",".join(fields) # --------------------------------------------------------------------- # AC-1: happy path on the real Derkachi CSV @pytest.mark.skipif( not _DERKACHI_CSV.is_file(), reason="Derkachi fixture data_imu.csv not present", ) def test_ac1_loads_derkachi_csv_emits_paired_samples() -> None: # Arrange — committed fixture path; nothing to set up. # Note: AZ-894 spec mentions "4,899 samples"; the actual fixture # spans Time=0.0..489.9 s in 0.1 s steps → 4,900 rows. We pin the # concrete count so the test catches truncation, plus the # span-derived invariant so future fixtures with a different # length still pass for the right reason. expected_count = 4900 # Act gt = load_csv_ground_truth(_DERKACHI_CSV) # Assert assert gt.source == CSV_SOURCE_LABEL assert len(gt.records) == expected_count assert len(gt.imu_samples) == expected_count # First row of the fixture has Time=0; last is 489.9 s (10 Hz). assert gt.records[0].ts_ns == 0 assert gt.records[-1].ts_ns == int(489.9 * 1e9) # IMU samples share the same canonical clock as the GPS records. for gps, imu in zip(gt.records, gt.imu_samples, strict=True): assert gps.ts_ns == imu.ts_ns # --------------------------------------------------------------------- # AZ-896 AC-3: the shipped example CSV stays parser-clean @pytest.mark.skipif( not _EXAMPLE_CSV.is_file(), reason="AZ-896 example_data_imu.csv not present", ) def test_az896_example_csv_loads_clean() -> None: # Arrange — committed AZ-896 example; nothing to set up. # Act gt = load_csv_ground_truth(_EXAMPLE_CSV) # Assert assert gt.source == CSV_SOURCE_LABEL assert len(gt.records) >= 10 assert len(gt.records) == len(gt.imu_samples) assert gt.records[0].ts_ns == 0 # --------------------------------------------------------------------- # AC-1 (small fixture): paired-sample invariants def test_paired_imu_and_gps_share_clock(tmp_path: Path) -> None: # Arrange csv = _write_csv( tmp_path / "ok.csv", _full_header(), [ _row(0.0, prefix_ms=4551116.348), _row(0.1, prefix_ms=4551216.348), _row(0.2, prefix_ms=4551316.348), ], ) # Act gt = load_csv_ground_truth(csv) # Assert assert len(gt.records) == 3 and len(gt.imu_samples) == 3 expected_ns = [0, 100_000_000, 200_000_000] assert [r.ts_ns for r in gt.records] == expected_ns assert [s.ts_ns for s in gt.imu_samples] == expected_ns def test_gps_unit_conversion(tmp_path: Path) -> None: # Arrange — values exercise the deg/mm/cm-s/cdeg conversions on # the GPS columns and the mG/mrad-s + FRD→FLU conversion on the # IMU columns (AZ-918). header = _full_header() row = ",".join([ "0.0", "0.0", "10", "-3", "-980", "50", "30", "-5", # IMU raw (mG/mrad·s⁻¹/FRD) "50.0809634", # lat already in degrees "36.1115442", # lon already in degrees "141290", # alt in mm → 141.290 m "-400", # vx in cm/s → -4.0 m/s "600", # vy in cm/s → 6.0 m/s "-88", # vz in cm/s → -0.88 m/s "35041", # hdg in cdeg → 350.41 deg ]) csv = _write_csv(tmp_path / "units.csv", header, [row]) # Act gt = load_csv_ground_truth(csv) # Assert — GPS in SI / decimal-degrees. fix = gt.records[0] assert fix.lat_deg == pytest.approx(50.0809634) assert fix.lon_deg == pytest.approx(36.1115442) assert fix.alt_m == pytest.approx(141.290) assert fix.vx_m_s == pytest.approx(-4.0) assert fix.vy_m_s == pytest.approx(6.0) assert fix.vz_m_s == pytest.approx(-0.88) assert fix.hdg_deg == pytest.approx(350.41) # Assert — IMU converted to m/s² + rad/s, body frame FLU. imu = gt.imu_samples[0] # AZ-918: CSV ships MAVLink wire format (mG/mrad/s, FRD body); the # parser routes through mavlink_imu_to_si_flu so consumers see SI/FLU. # FRD→FLU negates Y and Z, so a raw -3 (yacc) / -980 (zacc) become +3 / +980. assert imu.accel_xyz == pytest.approx(( 10 * 9.80665e-3, 3 * 9.80665e-3, 980 * 9.80665e-3, )) assert imu.gyro_xyz == pytest.approx(( 50 * 1.0e-3, -30 * 1.0e-3, 5 * 1.0e-3, )) # --------------------------------------------------------------------- # AC-5: schema faults raise ReplayInputAdapterError at startup def test_ac5_file_not_found_raises(tmp_path: Path) -> None: # Arrange missing = tmp_path / "absent.csv" # Act + Assert with pytest.raises(ReplayInputAdapterError, match="CSV file not found"): load_csv_ground_truth(missing) def test_ac5_missing_required_column_raises(tmp_path: Path) -> None: # Arrange — drop one required column from the header. bad_header = ",".join(c for c in REQUIRED_COLUMNS if c != "SCALED_IMU2.xacc") csv = _write_csv( tmp_path / "missing_col.csv", bad_header, ["0,0,-3,-980,50,30,-5,50.0,36.0,141290,-4,-6,-88,35041"], ) # Act + Assert with pytest.raises(ReplayInputAdapterError, match="missing required columns"): load_csv_ground_truth(csv) def test_ac5_nan_in_time_raises(tmp_path: Path) -> None: # Arrange csv = _write_csv( tmp_path / "nan_time.csv", _full_header(), [_row(0.0), _row(float("nan"))], ) # Act + Assert with pytest.raises(ReplayInputAdapterError, match="Time=.*is NaN/Inf"): load_csv_ground_truth(csv) def test_ac5_non_monotonic_time_raises(tmp_path: Path) -> None: # Arrange csv = _write_csv( tmp_path / "non_monotonic.csv", _full_header(), [_row(0.1), _row(0.0)], ) # Act + Assert with pytest.raises(ReplayInputAdapterError, match="non-monotonic Time"): load_csv_ground_truth(csv) def test_ac5_repeated_time_also_non_monotonic(tmp_path: Path) -> None: # Arrange — equal timestamps still violate strict monotonicity so # the preintegrator never gets fed a zero-delta window. csv = _write_csv( tmp_path / "repeated.csv", _full_header(), [_row(0.0), _row(0.0)], ) # Act + Assert with pytest.raises(ReplayInputAdapterError, match="non-monotonic Time"): load_csv_ground_truth(csv) def test_ac5_non_numeric_imu_value_raises(tmp_path: Path) -> None: # Arrange — substitute a non-parseable token in the IMU column. row = ",".join([ "0.0", "0.0", "not-a-number", # SCALED_IMU2.xacc "-3", "-980", "50", "30", "-5", "50.0", "36.0", "141290", "-4", "-6", "-88", "35041", ]) csv = _write_csv(tmp_path / "bad_imu.csv", _full_header(), [row]) # Act + Assert with pytest.raises( ReplayInputAdapterError, match=r"SCALED_IMU2\.xacc=.*is not a number", ): load_csv_ground_truth(csv) def test_ac5_header_only_raises(tmp_path: Path) -> None: # Arrange — header but no data rows. csv = tmp_path / "header_only.csv" csv.write_text(_full_header() + "\n", encoding="utf-8") # Act + Assert with pytest.raises(ReplayInputAdapterError, match="no data rows"): load_csv_ground_truth(csv)