Files
gps-denied-onboard/e2e/tests/positive/test_ft_p_02_derkachi_drift.py
Oleksandr Bezdieniezhnykh f49d803252 [AZ-597] Batch 77: replay_mode helpers + 13 scenario stub rewires
Add `runner/helpers/replay_mode.py` (NullFrameSink, NullFcInboundEmitter,
default_frame_period_ms, load_replay_json, resolve_replay_subdir,
imu_replay_noop) and rewire all 13 scenarios off their local
`_resolve_*` / `_drive_*` / `_push_*` NotImplementedError stubs.

Closes the offline FDR-replay execution path. `grep raise
NotImplementedError` under `e2e/tests/` now returns zero matches. +17
unit tests (626 total, up from 608). Unit-test behaviour unchanged
(scenarios still skip via b75 sitl_replay_ready gate when
E2E_SITL_REPLAY_DIR is unset).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 09:52:05 +03:00

160 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""FT-P-02 — Cumulative drift between satellite anchors on Derkachi (AC-1.3).
The full scenario:
1. Replay the Derkachi MP4 at 30 fps through the SUT's file-frame source.
2. Replay ``data_imu.csv`` at 10 Hz through the FC inbound (1 IMU per 3
video frames).
3. Observe the SUT's outbound estimate stream + the FDR archive.
4. Detect every (visual_propagated|dead_reckoned) → satellite_anchored
transition; compute drift = ||propagated_centre new_anchor||.
5. Bin drifts by ``last_satellite_anchor_age_ms``; assert AC-2/AC-3/AC-4.
6. Emit ``e2e-results/run-${RUN_ID}/ft-p-02.csv`` with one row per pair.
What this file owns:
* The AC-1.3 logic above, wired through the harness's ``fc_adapter`` /
``vio_strategy`` parametrize matrix (AC-5).
* CSV evidence emission via the AZ-410-owned ``anchor_pair_detector``.
What this file does NOT own:
* The MP4 video-replay path → ``runner.helpers.frame_source_replay``
(still a stub; AZ-408 was about the synthetic-injection injectors,
not the video replayer); the scenario is marked
``@pytest.mark.deferred_ac(reason=...)`` until that helper lands.
* The FDR-archive iteration → ``runner.helpers.fdr_reader`` (AZ-594,
landed in batch 74) — the scenario still depends on a prepared SITL
replay fixture (AZ-595) that produces the per-run FDR archive.
* The MAVLink ``GLOBAL_POSITION_INT`` GT replay → handled by the
``imu_replay`` helper (AZ-594, landed in batch 74).
When ``E2E_SITL_REPLAY_DIR`` is set and points at a prepared SITL
replay fixture, this file's runtime path activates automatically; until
then the scenario skips via the shared `sitl_replay_ready` fixture
(AZ-595).
"""
from __future__ import annotations
from pathlib import Path
import pytest
from runner.helpers import anchor_pair_detector as apd
@pytest.mark.traces_to("AC-1.3,AC-1,AC-2,AC-3,AC-4,AC-5")
def test_ft_p_02_derkachi_drift(
fc_adapter: str,
vio_strategy: str,
evidence_dir, # type: ignore[no-untyped-def]
run_id: str,
nfr_recorder, # type: ignore[no-untyped-def]
sitl_replay_ready: bool,
) -> None:
"""Full FT-P-02 scenario (AC-1.3). See module docstring.
AC-1: anchor-pair detection from FDR stream — covered by
``anchor_pair_detector.detect_anchor_pairs``; unit-tested in
``test_anchor_pair_detector.py``.
AC-2: visual-only drift bound (≥95 % < 100 m) — covered by aggregate().
AC-3: IMU-fused drift bound (≥95 % < 50 m) — covered by aggregate().
AC-4: bin medians monotonic with age — covered by check_monotonic().
AC-5: parametrized across (fc_adapter, vio_strategy).
"""
if not sitl_replay_ready:
pytest.skip(
"FT-P-02 full replay requires `E2E_SITL_REPLAY_DIR` to point at a "
"prepared SITL replay fixture (AZ-595). Pure-logic ACs covered by "
"e2e/_unit_tests/helpers/test_anchor_pair_detector.py."
)
# Once the helpers land, the body below activates. We keep it
# under the gate rather than commenting it out so the wiring stays
# under code review.
from runner.helpers import fdr_reader, frame_source_replay, imu_replay
from runner.helpers.frame_source_replay import FrameSourceReplayer
# 1. Spin up the SUT through the boundary-driving fixtures
# (mock_suite_sat URL + sitl_observer for the requested fc_adapter +
# a frame-sink + a MAVLink emitter for the requested vio_strategy).
# The actual wiring lives in helpers; the scenario only orchestrates.
sitl_host = "sitl-ardupilot" if fc_adapter == "ardupilot" else "sitl-inav"
# 2. Replay video + IMU.
sink = _resolve_frame_sink()
emitter = _resolve_fc_inbound_emitter(fc_adapter, sitl_host)
video_path = Path("/test-data/flight_derkachi/flight_derkachi.mp4")
imu_csv = Path("/test-data/flight_derkachi/data_imu.csv")
FrameSourceReplayer(sink).replay_video(video_path)
imu_replay.ImuReplayer(emitter).replay(imu_csv)
# 3. Crawl the FDR archive for the outbound estimate stream.
fdr_root = Path(evidence_dir).parent / f"run-{run_id}" / "fdr"
estimates: list[apd.FdrEstimate] = []
for rec in fdr_reader.iter_records(fdr_root):
if rec.record_type == "estimate":
payload = rec.payload
estimates.append(
apd.FdrEstimate(
monotonic_ms=int(rec.monotonic_ms),
lat_deg=float(payload["lat_deg"]), # type: ignore[arg-type]
lon_deg=float(payload["lon_deg"]), # type: ignore[arg-type]
source_label=str(payload["source_label"]), # type: ignore[arg-type]
imu_fused=bool(payload.get("imu_fused", False)),
cov_semi_major_m=float(payload.get("cov_semi_major_m", 0.0)), # type: ignore[arg-type]
last_satellite_anchor_age_ms=int(
payload.get("last_satellite_anchor_age_ms", 0) # type: ignore[arg-type]
),
)
)
# 4. Aggregate + AC checks.
report = apd.aggregate(estimates)
apd.write_csv_evidence(report, evidence_dir / f"ft-p-02-{fc_adapter}-{vio_strategy}.csv")
# 5. Record metrics for the NFR/csv reporter.
nfr_recorder.record_metric(
"ft_p_02.visual_only_pass_fraction", report.visual_only_pass_fraction, ac_id="AC-2"
)
nfr_recorder.record_metric(
"ft_p_02.imu_fused_pass_fraction", report.imu_fused_pass_fraction, ac_id="AC-3"
)
nfr_recorder.record_metric("ft_p_02.total_pairs", float(len(report.pairs)), ac_id="AC-1")
# 6. AC assertions.
if len(report.visual_only_pairs) > 0:
assert report.visual_only_pass_fraction >= 0.95, (
f"AC-2 (visual-only drift <100 m) failed at "
f"{report.visual_only_pass_fraction:.2%} over {len(report.visual_only_pairs)} pairs"
)
if len(report.imu_fused_pairs) > 0:
assert report.imu_fused_pass_fraction >= 0.95, (
f"AC-3 (IMU-fused drift <50 m) failed at "
f"{report.imu_fused_pass_fraction:.2%} over {len(report.imu_fused_pairs)} pairs"
)
if len(report.pairs) >= 20:
# AC-4 requires statistical power; small-N flights skip the
# monotonicity check per the spec's "N<20 flagged" note.
assert not report.monotonic_violations, (
"AC-4 (monotonic drift vs anchor age) failed: "
+ "; ".join(report.monotonic_violations)
)
else:
nfr_recorder.partial("AC-4", reason=f"N={len(report.pairs)} < 20 — statistical power flagged")
def _resolve_frame_sink(): # type: ignore[no-untyped-def]
"""Return a replay-mode `FrameSink` (counter-only; AZ-597)."""
from runner.helpers.replay_mode import NullFrameSink
return NullFrameSink()
def _resolve_fc_inbound_emitter(fc_adapter: str, host: str): # type: ignore[no-untyped-def]
"""Return a replay-mode `FcInboundEmitter` (counter-only; AZ-597)."""
from runner.helpers.replay_mode import NullFcInboundEmitter
return NullFcInboundEmitter()