"""AZ-591 — airborne_bootstrap registry-population AC tests. Verifies the contract at ``_docs/02_tasks/todo/AZ-591_compose_root_per_binary_bootstrap.md``: * AC-1: ``register_airborne_strategies()`` populates the central ``_STRATEGY_REGISTRY`` with one entry per (component, strategy) pair the airborne binary supports (14 total across 7 slots). * AC-2: ``compose_root(config, pre_constructed=...)`` reaches completion without ``StrategyNotLinkedError`` when every component selects its default strategy AND ``pre_constructed`` carries the required infrastructure deps. * AC-3: registering twice in the same process is a no-op (idempotent). * AC-4: an unknown strategy in config surfaces ``StrategyNotLinkedError`` with the available-strategies list populated. * AC-5: airborne registrations are tier-isolated from ``compose_operator``. """ from __future__ import annotations from collections.abc import Iterator from dataclasses import dataclass from typing import Any import pytest from gps_denied_onboard.config import Config from gps_denied_onboard.runtime_root import ( StrategyNotLinkedError, clear_strategy_registry, compose_operator, compose_root, list_registered_strategies, ) from gps_denied_onboard.runtime_root.airborne_bootstrap import ( AIRBORNE_REQUIRED_PRE_CONSTRUCTED_KEYS, AirborneBootstrapError, register_airborne_strategies, ) @dataclass(frozen=True) class _C1Block: strategy: str = "klt_ransac" @dataclass(frozen=True) class _C2Block: strategy: str = "net_vlad" @dataclass(frozen=True) class _C25Block: strategy: str = "inlier_count" @dataclass(frozen=True) class _C3Block: strategy: str = "disk_lightglue" @dataclass(frozen=True) class _C35Block: strategy: str = "adhop" @dataclass(frozen=True) class _C4Block: strategy: str = "opencv_gtsam" @dataclass(frozen=True) class _C5Block: strategy: str = "gtsam_isam2" _EXPECTED_REGISTRATIONS: dict[str, tuple[str, ...]] = { "c1_vio": ("klt_ransac", "okvis2", "vins_mono"), "c2_vpr": ( "eigen_places", "mega_loc", "mix_vpr", "net_vlad", "salad", "sela_vpr", "ultra_vpr", ), "c2_5_rerank": ("inlier_count",), "c3_matcher": ("aliked_lightglue", "disk_lightglue"), "c3_5_adhop": ("adhop",), "c4_pose": ("opencv_gtsam",), "c5_state": ("eskf", "gtsam_isam2"), } @pytest.fixture(autouse=True) def _isolated_registry() -> Iterator[None]: clear_strategy_registry() yield clear_strategy_registry() @pytest.fixture def _airborne_env(monkeypatch: pytest.MonkeyPatch) -> None: for name, value in ( ("GPS_DENIED_FC_PROFILE", "ardupilot_plane"), ("GPS_DENIED_TIER", "1"), ("DB_URL", "postgresql+psycopg://gps_denied:dev@db:5432/gps_denied"), ("CAMERA_CALIBRATION_PATH", "/etc/gps-denied/calib.yml"), ("LOG_LEVEL", "INFO"), ("LOG_SINK", "console"), ("INFERENCE_BACKEND", "pytorch_fp16"), ("FDR_PATH", "/var/lib/gps-denied/fdr"), ("TILE_CACHE_PATH", "/var/lib/gps-denied/tiles"), ("MAVLINK_SIGNING_KEY", "ZZZZZZZZ"), ): monkeypatch.setenv(name, value) @pytest.fixture def _operator_env(monkeypatch: pytest.MonkeyPatch) -> None: for name, value in ( ("GPS_DENIED_FC_PROFILE", "ardupilot_plane"), ("GPS_DENIED_TIER", "1"), ("DB_URL", "postgresql+psycopg://gps_denied:dev@db:5432/gps_denied"), ("CAMERA_CALIBRATION_PATH", "/etc/gps-denied/calib.yml"), ("LOG_LEVEL", "INFO"), ("LOG_SINK", "console"), ("INFERENCE_BACKEND", "pytorch_fp16"), ("FDR_PATH", "/var/lib/gps-denied/fdr"), ("TILE_CACHE_PATH", "/var/lib/gps-denied/tiles"), ("SATELLITE_PROVIDER_URL", "http://localhost:8080"), ): monkeypatch.setenv(name, value) def test_ac1_register_airborne_strategies_populates_all_slots() -> None: # Arrange / Act register_airborne_strategies() # Assert for slug, expected_strategies in _EXPECTED_REGISTRATIONS.items(): registered = list_registered_strategies(slug) assert sorted(registered) == list(expected_strategies), ( f"slot {slug!r}: expected {list(expected_strategies)}, " f"got {registered}" ) def test_ac3_idempotent_double_register_is_a_no_op() -> None: # Arrange register_airborne_strategies() snapshot = { slug: sorted(list_registered_strategies(slug)) for slug in _EXPECTED_REGISTRATIONS } # Act register_airborne_strategies() # Assert for slug, strategies in snapshot.items(): assert sorted(list_registered_strategies(slug)) == strategies, ( f"slot {slug!r}: second register_airborne_strategies() call " "must not change the registered set" ) def test_ac4_unknown_strategy_in_config_raises_strategy_not_linked( _airborne_env: None, ) -> None: # Arrange register_airborne_strategies() config = Config.with_blocks(c2_vpr=_C2Block(strategy="not_a_real_strategy")) # Act with pytest.raises(StrategyNotLinkedError) as info: compose_root(config) # Assert assert info.value.strategy_name == "not_a_real_strategy" assert info.value.component_slug == "c2_vpr" assert info.value.available_strategies == [ "eigen_places", "mega_loc", "mix_vpr", "net_vlad", "salad", "sela_vpr", "ultra_vpr", ] def test_ac5_compose_operator_rejects_airborne_tier( _operator_env: None, ) -> None: # Arrange register_airborne_strategies() config = Config.with_blocks(c1_vio=_C1Block()) # Act with pytest.raises(StrategyNotLinkedError) as info: compose_operator(config) # Assert assert info.value.component_slug == "c1_vio" assert "airborne" in info.value.reason or "tier" in info.value.reason def test_ac2_compose_root_reaches_completion_with_pre_constructed_infra( _airborne_env: None, monkeypatch: pytest.MonkeyPatch, ) -> None: """End-to-end smoke test of the AC-2 contract. The wrappers call into per-component factories that require heavy runtime deps (gtsam, opencv, lightglue, etc.). To keep this a unit test we monkeypatch the wrapper functions themselves to assert the ``constructed`` dict is wired correctly and return sentinel objects. The assertion is that ``compose_root`` walks every slot in topological order and stores the wrapper's return value under that slot. """ # Arrange — stub each wrapper to record invocation order + return a # slug-tagged sentinel. We patch the registered wrapper objects directly # on the module so register_airborne_strategies() picks up the stubs. from gps_denied_onboard.runtime_root import airborne_bootstrap as ab invocation_order: list[str] = [] def _make_stub(slug: str) -> Any: def _stub(config: Any, constructed: dict[str, Any]) -> str: invocation_order.append(slug) return f"<{slug}>" return _stub monkeypatch.setattr(ab, "_c1_vio_wrapper", _make_stub("c1_vio")) monkeypatch.setattr(ab, "_c2_vpr_wrapper", _make_stub("c2_vpr")) monkeypatch.setattr(ab, "_c2_5_rerank_wrapper", _make_stub("c2_5_rerank")) monkeypatch.setattr(ab, "_c3_matcher_wrapper", _make_stub("c3_matcher")) monkeypatch.setattr(ab, "_c3_5_adhop_wrapper", _make_stub("c3_5_adhop")) monkeypatch.setattr(ab, "_c4_pose_wrapper", _make_stub("c4_pose")) monkeypatch.setattr(ab, "_c5_state_wrapper", _make_stub("c5_state")) # Rebuild the registration table so the patched wrappers are picked up. monkeypatch.setattr( ab, "_AIRBORNE_REGISTRATIONS", ( ("c1_vio", ab._C1_VIO_STRATEGIES, ab._c1_vio_wrapper, ()), ("c2_vpr", ab._C2_VPR_STRATEGIES, ab._c2_vpr_wrapper, ()), ( "c2_5_rerank", ab._C2_5_RERANK_STRATEGIES, ab._c2_5_rerank_wrapper, ("c2_vpr",), ), ( "c3_matcher", ab._C3_MATCHER_STRATEGIES, ab._c3_matcher_wrapper, (), ), ( "c3_5_adhop", ab._C3_5_ADHOP_STRATEGIES, ab._c3_5_adhop_wrapper, ("c3_matcher",), ), ( "c4_pose", ab._C4_POSE_STRATEGIES, ab._c4_pose_wrapper, ("c1_vio", "c3_matcher"), ), ( "c5_state", ab._C5_STATE_STRATEGIES, ab._c5_state_wrapper, ("c1_vio", "c4_pose"), ), ), ) register_airborne_strategies() config = Config.with_blocks( c1_vio=_C1Block(), c2_vpr=_C2Block(), c2_5_rerank=_C25Block(), c3_matcher=_C3Block(), c3_5_adhop=_C35Block(), c4_pose=_C4Block(), c5_state=_C5Block(), ) # Act root = compose_root(config, pre_constructed={}) # Assert assert set(root.components.keys()) == set(_EXPECTED_REGISTRATIONS) for slug in _EXPECTED_REGISTRATIONS: assert root.components[slug] == f"<{slug}>" # Dependencies construct strictly before dependents (declared in the # patched _AIRBORNE_REGISTRATIONS table above). idx = invocation_order.index assert idx("c2_vpr") < idx("c2_5_rerank") assert idx("c3_matcher") < idx("c3_5_adhop") assert idx("c1_vio") < idx("c4_pose") assert idx("c3_matcher") < idx("c4_pose") assert idx("c1_vio") < idx("c5_state") assert idx("c4_pose") < idx("c5_state") def test_wrapper_raises_clear_error_on_missing_pre_constructed_dep( _airborne_env: None, ) -> None: """Negative path for the production wrappers: deps must be in constructed.""" # Arrange register_airborne_strategies() config = Config.with_blocks(c1_vio=_C1Block()) # Act with pytest.raises(AirborneBootstrapError) as info: compose_root(config, pre_constructed={}) # Assert msg = str(info.value) assert "c1_vio" in msg assert "c13_fdr" in msg assert "pre_constructed" in msg def test_required_keys_table_is_consistent_with_expected_registrations() -> None: # Arrange / Assert — every registered slot has documented infrastructure # dep keys; no surprise sources of pre_constructed lookups outside the # documented table. assert set(AIRBORNE_REQUIRED_PRE_CONSTRUCTED_KEYS.keys()) == set( _EXPECTED_REGISTRATIONS )