"""Behavioural tests for the AZ-408 FC inbound proxy patch. Covers AC-3 (video↔proxy alignment ≤ 40 ms — verified end-to-end via the fake clock here; the runtime path observes the same invariant) and the proxy's pass-through / spoof-replace semantics. """ from __future__ import annotations import json from pathlib import Path import pytest from fixtures.injectors.fc_proxy import BlackoutSpoofProxy, SpoofGpsRecord class _FakeClock: """Monotonic ms clock that the test advances manually.""" def __init__(self, start_ms: int = 0) -> None: self.now_ms = start_ms def __call__(self) -> int: return self.now_ms def advance(self, ms: int) -> None: self.now_ms += ms def _spoof_records() -> list[SpoofGpsRecord]: return [ SpoofGpsRecord(monotonic_ms=1000 + i * 100, lat_deg=50.0 + i * 0.001, lon_deg=36.1, alt_m=300.0, fix_type=3, hdop=1.0) for i in range(5) ] def test_proxy_passes_through_outside_window() -> None: # Arrange — schedule the first blackout 500 ms in the future. The # activate() call binds proxy_time(now) = 0; the window opens at # window_start_ms = 500 in proxy time. Now (proxy_time = 0) is # outside [500, 1000], so the proxy must pass through. clock = _FakeClock(start_ms=1000) proxy = BlackoutSpoofProxy(window_start_ms=500, window_end_ms=1000, spoof_gps=_spoof_records()) proxy.activate(now_ms_provider=clock, first_blackout_ms=1500) msg = {"lat_deg": 49.9, "lon_deg": 36.0, "alt_m": 280.0} # Act out = proxy.process_inbound_message(msg) # Assert assert out == msg assert "__spoofed__" not in out def test_proxy_spoofs_inside_window() -> None: # Arrange clock = _FakeClock(start_ms=0) proxy = BlackoutSpoofProxy(window_start_ms=0, window_end_ms=500, spoof_gps=_spoof_records()) proxy.activate(now_ms_provider=clock, first_blackout_ms=0) msg = {"lat_deg": 49.9, "lon_deg": 36.0, "alt_m": 280.0} # Act — clock=0 ⇒ proxy_time(0) = 0 (inside window) out = proxy.process_inbound_message(msg) # Assert assert out["__spoofed__"] is True assert out["lat_deg"] != msg["lat_deg"] assert out["fix_type"] == 3 def test_proxy_returns_to_passthrough_after_window() -> None: # Arrange clock = _FakeClock(start_ms=0) proxy = BlackoutSpoofProxy(window_start_ms=0, window_end_ms=500, spoof_gps=_spoof_records()) proxy.activate(now_ms_provider=clock, first_blackout_ms=0) # Act — advance past end of window clock.advance(1000) msg = {"lat_deg": 50.0, "lon_deg": 36.0, "alt_m": 300.0} out = proxy.process_inbound_message(msg) # Assert assert out == msg def test_alignment_err_below_40ms_when_clock_matches_first_blackout() -> None: """AC-3: when the test harness calls activate() at the same ms the first blackout frame fires, alignment error is 0.""" # Arrange clock = _FakeClock(start_ms=12_345) proxy = BlackoutSpoofProxy(window_start_ms=0, window_end_ms=500, spoof_gps=_spoof_records()) # Act report = proxy.activate(now_ms_provider=clock, first_blackout_ms=12_345) # Assert assert report.alignment_err_ms == 0 assert report.alignment_err_ms <= 40 def test_alignment_err_within_budget_under_normal_clock_skew() -> None: """Real harness can have a 30 ms skew between video & proxy; still inside AC-3.""" # Arrange clock = _FakeClock(start_ms=12_400) proxy = BlackoutSpoofProxy(window_start_ms=0, window_end_ms=500, spoof_gps=_spoof_records()) # Act — first_blackout_ms is 30 ms earlier than clock (harness skew) report = proxy.activate(now_ms_provider=clock, first_blackout_ms=12_370) # Assert assert report.alignment_err_ms == 30 assert report.alignment_err_ms <= 40 def test_exhausting_spoof_list_repeats_last() -> None: """When the spoofed-GPS list is drained, the FC keeps seeing the last record.""" # Arrange clock = _FakeClock(start_ms=0) spoofs = _spoof_records() proxy = BlackoutSpoofProxy(window_start_ms=0, window_end_ms=10_000, spoof_gps=spoofs) proxy.activate(now_ms_provider=clock, first_blackout_ms=0) # Act — pull 10 frames (more than the 5 in the list) outs = [proxy.process_inbound_message({"lat_deg": 0, "lon_deg": 0, "alt_m": 0}) for _ in range(10)] # Assert — last 5 outputs all reuse the final spoof record last = spoofs[-1] for o in outs[-3:]: assert o["lat_deg"] == last.lat_deg assert o["lon_deg"] == last.lon_deg def test_from_schedule_file_round_trip(tmp_path: Path) -> None: # Arrange sched_path = tmp_path / "schedule.json" sched_path.write_text( json.dumps( { "window_start_ms": 0, "window_end_ms": 200, "max_alignment_err_ms": 40.0, "blackout_frame_indices": [0, 1, 2], "spoof_gps": [ {"monotonic_ms": 0, "lat_deg": 50.0, "lon_deg": 36.0, "alt_m": 300.0, "fix_type": 3, "hdop": 1.0}, ], } ) ) # Act proxy = BlackoutSpoofProxy.from_schedule_file(sched_path) proxy.activate(now_ms_provider=lambda: 0) out = proxy.process_inbound_message({"lat_deg": 0, "lon_deg": 0, "alt_m": 0}) # Assert assert out["__spoofed__"] is True assert out["lat_deg"] == 50.0 def test_from_schedule_file_missing_raises(tmp_path: Path) -> None: # Arrange / Act / Assert with pytest.raises(FileNotFoundError): BlackoutSpoofProxy.from_schedule_file(tmp_path / "missing.json") def test_process_before_activate_raises() -> None: # Arrange proxy = BlackoutSpoofProxy(window_start_ms=0, window_end_ms=100, spoof_gps=_spoof_records()) # Act / Assert with pytest.raises(RuntimeError, match="not activated"): proxy.process_inbound_message({}) def test_in_window_false_before_activate() -> None: # Arrange proxy = BlackoutSpoofProxy(window_start_ms=0, window_end_ms=100, spoof_gps=[]) # Act / Assert assert proxy.in_window() is False