"""AZ-894 — ``CsvReplayFcAdapter`` unit tests. Focused contract coverage for the thin :class:`FcAdapter` sibling that backs the CSV-driven replay input. The functional inbound/outbound plumbing the runtime loop relies on (CSV parsing, frame-stamped IMU draining, ESKF cold-start origin) is exercised in ``tests/unit/replay_input/test_csv_ground_truth.py`` and the AZ-404 e2e harness; here we pin the Protocol surface (open/close idempotency, build-flag refusal, source-set refusal, transport-less emit refusal) so a refactor of the adapter cannot silently regress Invariant 5 or the FcAdapter Protocol parity that the composition root depends on. Style: every test follows the Arrange / Act / Assert pattern. """ from __future__ import annotations from pathlib import Path import pytest from gps_denied_onboard._types.fc import FcKind, FlightState, Severity from gps_denied_onboard.components.c8_fc_adapter.csv_replay_adapter import ( CsvReplayFcAdapter, ) from gps_denied_onboard.components.c8_fc_adapter.errors import ( FcAdapterConfigError, FcEmitError, FcOpenError, SourceSetSwitchNotSupportedError, ) from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import ( ReplayPace, ) @pytest.fixture(autouse=True) def _build_flag_on(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("BUILD_CSV_REPLAY_ADAPTER", "ON") class _FakeClock: """Minimal Clock stub returning a monotonic counter in ns.""" def __init__(self) -> None: self._t = 0 def monotonic_ns(self) -> int: self._t += 1 return self._t def sleep_until_ns(self, _: int) -> None: # pragma: no cover — unused. return None class _FakeFdr: """No-op FDR client stand-in (CsvReplayFcAdapter does not emit FDR yet).""" def enqueue(self, _record: object) -> None: # pragma: no cover — unused. return None @pytest.fixture() def csv_file(tmp_path: Path) -> Path: # Existence is the only check the adapter does at open() time — # the body never has to be parseable here (parsing lives in # csv_ground_truth.load_csv_ground_truth, tested separately). path = tmp_path / "data_imu.csv" path.write_text("placeholder", encoding="utf-8") return path def _make_adapter(csv_path: Path) -> CsvReplayFcAdapter: return CsvReplayFcAdapter( csv_path=csv_path, target_fc_dialect=FcKind.ARDUPILOT_PLANE, clock=_FakeClock(), fdr_client=_FakeFdr(), pace=ReplayPace.ASAP, ) # ---------------------------------------------------------------------- # Build flag def test_construction_refused_when_build_flag_off( monkeypatch: pytest.MonkeyPatch, csv_file: Path ) -> None: # Arrange monkeypatch.setenv("BUILD_CSV_REPLAY_ADAPTER", "OFF") # Act + Assert with pytest.raises(FcAdapterConfigError, match="BUILD_CSV_REPLAY_ADAPTER"): _make_adapter(csv_file) def test_construction_rejects_non_path(csv_file: Path) -> None: # Arrange — argument intentionally a str rather than Path. # Act + Assert with pytest.raises(FcAdapterConfigError, match="csv_path must be a pathlib.Path"): CsvReplayFcAdapter( csv_path=str(csv_file), # type: ignore[arg-type] target_fc_dialect=FcKind.ARDUPILOT_PLANE, clock=_FakeClock(), fdr_client=_FakeFdr(), pace=ReplayPace.ASAP, ) def test_construction_rejects_unknown_dialect(csv_file: Path) -> None: # Act + Assert with pytest.raises(FcAdapterConfigError, match="target_fc_dialect"): CsvReplayFcAdapter( csv_path=csv_file, target_fc_dialect=FcKind.GCS_QGC, clock=_FakeClock(), fdr_client=_FakeFdr(), pace=ReplayPace.ASAP, ) # ---------------------------------------------------------------------- # open / close def test_open_refused_when_csv_missing(tmp_path: Path) -> None: # Arrange adapter = _make_adapter(tmp_path / "absent.csv") # Act + Assert with pytest.raises(FcOpenError, match="CSV file not found"): adapter.open() def test_double_open_raises(csv_file: Path) -> None: # Arrange adapter = _make_adapter(csv_file) adapter.open() # Act + Assert with pytest.raises(FcOpenError, match="already opened"): adapter.open() def test_close_is_idempotent_before_open(csv_file: Path) -> None: # Arrange adapter = _make_adapter(csv_file) # Act — close() before open() is a documented no-op (parity with tlog). adapter.close() adapter.close() # Assert — no exception raised; state remains closeable. assert True def test_close_is_idempotent_after_open(csv_file: Path) -> None: # Arrange adapter = _make_adapter(csv_file) adapter.open() # Act adapter.close() adapter.close() # Assert assert True # ---------------------------------------------------------------------- # Protocol parity def test_subscribe_returns_real_subscription_handle(csv_file: Path) -> None: # Arrange adapter = _make_adapter(csv_file) adapter.open() # Act subscription = adapter.subscribe_telemetry(lambda _frame: None) # Assert — handle exposes the cancel() entry point even though the # bus is intentionally never fed (replay loop reads CSV directly). assert hasattr(subscription, "cancel") subscription.cancel() def test_source_set_switch_unsupported(csv_file: Path) -> None: # Arrange adapter = _make_adapter(csv_file) # Act + Assert with pytest.raises(SourceSetSwitchNotSupportedError): adapter.request_source_set_switch() def test_current_flight_state_returns_init_signal(csv_file: Path) -> None: # Arrange — CSV carries no MAVLink HEARTBEAT, so the adapter has # nothing to latch; the contract is to return an INIT-state signal. adapter = _make_adapter(csv_file) # Act signal = adapter.current_flight_state() # Assert assert signal.state is FlightState.INIT assert signal.last_valid_gps_hint_wgs84 is None assert signal.last_valid_gps_age_ms is None # ---------------------------------------------------------------------- # Outbound (Invariant 5) def test_emit_external_position_raises_without_transport(csv_file: Path) -> None: # Arrange — no MavlinkTransport injected → adapter falls back to the # AZ-399 raise-on-emit contract, mirroring TlogReplayFcAdapter. adapter = _make_adapter(csv_file) adapter.open() # Act + Assert with pytest.raises(FcEmitError, match="does not emit"): adapter.emit_external_position(_dummy_estimator_output()) def test_emit_status_text_raises_without_transport(csv_file: Path) -> None: # Arrange adapter = _make_adapter(csv_file) adapter.open() # Act + Assert with pytest.raises(FcEmitError, match="does not emit"): adapter.emit_status_text("hello", severity=Severity.INFO) # ---------------------------------------------------------------------- # Helpers def _dummy_estimator_output() -> object: # The transport-less emit path short-circuits with FcEmitError before # reading any field, so a duck-typed stand-in is enough — duplicating # the full EstimatorOutput (UUID frame_id, 6x6 covariance, etc.) # would only hide the actual contract being tested. from types import SimpleNamespace return SimpleNamespace()