"""AZ-317 ``FlightStateGate`` unit tests. Covers all eight acceptance criteria + NFRs from ``_docs/02_tasks/done/AZ-317_c11_flight_state_gate.md`` (after the batch-38 archive). Uses a hand-rolled fake :class:`FlightStateSource` and a list-backed log handler so assertions stay close to the captured records. """ from __future__ import annotations import logging import time from datetime import datetime, timezone import pytest from gps_denied_onboard.components.c11_tile_manager import ( FlightStateGate, FlightStateNotOnGroundError, FlightStateSignal, FlightStateSource, ) # ---------------------------------------------------------------------- # Helpers # ---------------------------------------------------------------------- class _FakeSource: """Hand-rolled :class:`FlightStateSource` returning a fixed signal. Spies on every ``current_flight_state`` call so AC-8 can assert the gate calls the source exactly once per ``confirm_on_ground``. """ def __init__(self, signal: FlightStateSignal) -> None: self._signal = signal self.call_count = 0 def current_flight_state(self) -> FlightStateSignal: self.call_count += 1 return self._signal class _RaisingSource: """:class:`FlightStateSource` whose ``current_flight_state`` raises.""" def __init__(self, exc: Exception) -> None: self._exc = exc self.call_count = 0 def current_flight_state(self) -> FlightStateSignal: self.call_count += 1 raise self._exc class _PartialFake: """Type stub WITHOUT ``current_flight_state`` for AC-6 negative case.""" def something_else(self) -> str: return "noop" def _build_gate( *, source: FlightStateSource, ) -> tuple[FlightStateGate, list[logging.LogRecord]]: records: list[logging.LogRecord] = [] class _ListHandler(logging.Handler): def emit(self, record: logging.LogRecord) -> None: records.append(record) logger = logging.getLogger(f"test_az317_{id(records)}") logger.handlers.clear() logger.addHandler(_ListHandler()) logger.setLevel(logging.DEBUG) logger.propagate = False return FlightStateGate(source=source, logger=logger), records def _kinds(records: list[logging.LogRecord]) -> list[str]: return [getattr(r, "kind", None) for r in records] # ---------------------------------------------------------------------- # AC-1: ON_GROUND passes # ---------------------------------------------------------------------- def test_ac1_on_ground_returns_signal_and_emits_info_log() -> None: # Arrange source = _FakeSource(FlightStateSignal.ON_GROUND) gate, records = _build_gate(source=source) # Act result = gate.confirm_on_ground() # Assert assert result is FlightStateSignal.ON_GROUND assert _kinds(records) == ["c11.upload.flight_state_confirmed"] assert records[0].levelname == "INFO" assert source.call_count == 1 # ---------------------------------------------------------------------- # AC-2: IN_FLIGHT raises # ---------------------------------------------------------------------- def test_ac2_in_flight_raises_with_observed_and_error_log() -> None: # Arrange source = _FakeSource(FlightStateSignal.IN_FLIGHT) gate, records = _build_gate(source=source) # Act + Assert with pytest.raises(FlightStateNotOnGroundError) as excinfo: gate.confirm_on_ground() assert excinfo.value.observed is FlightStateSignal.IN_FLIGHT assert "IN_FLIGHT" in str(excinfo.value) assert _kinds(records) == ["c11.upload.refused.flight_state"] assert records[0].levelname == "ERROR" # ---------------------------------------------------------------------- # AC-3: UNKNOWN raises (fail-closed) # ---------------------------------------------------------------------- def test_ac3_unknown_raises_fail_closed() -> None: # Arrange source = _FakeSource(FlightStateSignal.UNKNOWN) gate, records = _build_gate(source=source) # Act + Assert with pytest.raises(FlightStateNotOnGroundError) as excinfo: gate.confirm_on_ground() assert excinfo.value.observed is FlightStateSignal.UNKNOWN assert _kinds(records) == ["c11.upload.refused.flight_state"] # ---------------------------------------------------------------------- # AC-4: TAKING_OFF and LANDING raise # ---------------------------------------------------------------------- @pytest.mark.parametrize( "transition_signal", [FlightStateSignal.TAKING_OFF, FlightStateSignal.LANDING], ) def test_ac4_transition_states_raise( transition_signal: FlightStateSignal, ) -> None: # Arrange source = _FakeSource(transition_signal) gate, records = _build_gate(source=source) # Act + Assert with pytest.raises(FlightStateNotOnGroundError) as excinfo: gate.confirm_on_ground() assert excinfo.value.observed is transition_signal assert _kinds(records) == ["c11.upload.refused.flight_state"] # ---------------------------------------------------------------------- # AC-5: source exception → UNKNOWN with __cause__ chained # ---------------------------------------------------------------------- def test_ac5_source_exception_maps_to_unknown_and_preserves_cause() -> None: # Arrange original = RuntimeError("FC disconnected") source = _RaisingSource(original) gate, records = _build_gate(source=source) # Act + Assert with pytest.raises(FlightStateNotOnGroundError) as excinfo: gate.confirm_on_ground() assert excinfo.value.observed is FlightStateSignal.UNKNOWN assert excinfo.value.__cause__ is original assert _kinds(records) == ["c11.upload.refused.flight_state"] assert records[0].levelname == "ERROR" assert "FC disconnected" in records[0].kv["source_error"] # ---------------------------------------------------------------------- # AC-6: FlightStateSource Protocol is conformance-checkable # ---------------------------------------------------------------------- def test_ac6_protocol_isinstance_check_distinguishes_conforming_from_partial() -> None: # Arrange conforming = _FakeSource(FlightStateSignal.ON_GROUND) non_conforming = _PartialFake() # Assert assert isinstance(conforming, FlightStateSource) assert not isinstance(non_conforming, FlightStateSource) # ---------------------------------------------------------------------- # AC-7: Error carries diagnostic fields # ---------------------------------------------------------------------- def test_ac7_error_carries_observed_and_observed_at_with_message_format() -> None: # Arrange source = _FakeSource(FlightStateSignal.IN_FLIGHT) gate, _ = _build_gate(source=source) # Act with pytest.raises(FlightStateNotOnGroundError) as excinfo: gate.confirm_on_ground() # Assert assert excinfo.value.observed is FlightStateSignal.IN_FLIGHT assert isinstance(excinfo.value.observed_at, datetime) assert excinfo.value.observed_at.tzinfo == timezone.utc assert excinfo.value.observed_at.microsecond == 0 assert str(excinfo.value).startswith("Upload refused: flight state is ") # ---------------------------------------------------------------------- # AC-8: Gate calls source exactly once # ---------------------------------------------------------------------- def test_ac8_gate_calls_source_exactly_once_no_retry() -> None: # Arrange source = _FakeSource(FlightStateSignal.IN_FLIGHT) gate, _ = _build_gate(source=source) # Act with pytest.raises(FlightStateNotOnGroundError): gate.confirm_on_ground() # Assert assert source.call_count == 1 # ---------------------------------------------------------------------- # NFR-perf: confirm_on_ground microbench p99 ≤ 1 ms # ---------------------------------------------------------------------- def test_nfr_perf_microbench_under_one_ms_p99() -> None: # Arrange source = _FakeSource(FlightStateSignal.ON_GROUND) gate, _ = _build_gate(source=source) iterations = 5_000 # Act samples_ns: list[int] = [] for _ in range(iterations): start = time.perf_counter_ns() gate.confirm_on_ground() samples_ns.append(time.perf_counter_ns() - start) # Assert samples_ns.sort() p99_ns = samples_ns[int(iterations * 0.99) - 1] assert p99_ns < 1_000_000, ( f"p99 latency {p99_ns} ns exceeds 1 ms (1_000_000 ns) NFR budget" ) # ---------------------------------------------------------------------- # NFR-reliability-fail-closed: every non-ON_GROUND state raises # ---------------------------------------------------------------------- @pytest.mark.parametrize( "non_on_ground_signal", [ FlightStateSignal.IN_FLIGHT, FlightStateSignal.TAKING_OFF, FlightStateSignal.LANDING, FlightStateSignal.UNKNOWN, ], ) def test_nfr_reliability_fail_closed_matrix_complete( non_on_ground_signal: FlightStateSignal, ) -> None: # Arrange source = _FakeSource(non_on_ground_signal) gate, _ = _build_gate(source=source) # Act + Assert with pytest.raises(FlightStateNotOnGroundError): gate.confirm_on_ground()