mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 12:11:13 +00:00
3acc7f33dd
AZ-270: composition root with strategy registry, tier-gated lookup, topo-order construction, all-or-nothing teardown, StrategyNotLinkedError payload. AZ-272: orjson-backed FdrRecord serialise/parse with forward-compat for unknown payload + top-level fields and canonical overrun-record shape. AZ-279: pyproj-backed WGS84/ECEF/ENU + OSM slippy-map tile math with WgsConversionError for shape/range/zoom guards. AZ-281: strict EngineFilenameSchema build/parse/matches_host with anchored regex + enum validation; round-trip identity by construction. AZ-283: dtype-preserving (fp16/fp32) single + batch L2 normaliser with zero-norm safety and descriptor_metric() source-of-truth. pyproject.toml pins pyproj>=3.6 and orjson>=3.9 (named-backend deps per the AZ-272 / AZ-279 contracts). New DTOs LatLonAlt + BoundingBox and EngineCacheKey + HostCapabilities land in _types/ to back the helper contracts. 203 unit tests pass (64 new). Review verdict: PASS_WITH_WARNINGS; findings are perf-NFR deferrals + dep amendment + minor docstring polish. Co-authored-by: Cursor <cursoragent@cursor.com>
271 lines
9.6 KiB
Python
271 lines
9.6 KiB
Python
"""AZ-270 — Composition Root AC tests.
|
|
|
|
Verifies the contract at ``_docs/02_document/contracts/shared_config/composition_root_protocol.md`` v1.0.0.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ast
|
|
from collections.abc import Iterator
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from gps_denied_onboard.config import Config
|
|
from gps_denied_onboard.runtime_root import (
|
|
RuntimeRoot,
|
|
StrategyNotLinkedError,
|
|
clear_strategy_registry,
|
|
compose_operator,
|
|
compose_root,
|
|
list_registered_strategies,
|
|
register_strategy,
|
|
)
|
|
|
|
_REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class _C1Block:
|
|
strategy: str = "okvis2"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class _C4Block:
|
|
strategy: str = "opencv_gtsam"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class _C5Block:
|
|
strategy: str = "gtsam_isam2"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class _C11Block:
|
|
strategy: str = "ardupilot_tile_manager"
|
|
|
|
|
|
@dataclass
|
|
class _OrderRecorder:
|
|
constructed: list[str]
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _isolated_registry() -> Iterator[None]:
|
|
"""Reset the strategy registry around every test."""
|
|
clear_strategy_registry()
|
|
yield
|
|
clear_strategy_registry()
|
|
|
|
|
|
@pytest.fixture
|
|
def _airborne_env(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
for name, value in (
|
|
("GPS_DENIED_FC_PROFILE", "ardupilot_plane"),
|
|
("GPS_DENIED_TIER", "1"),
|
|
("DB_URL", "postgresql+psycopg://gps_denied:dev@db:5432/gps_denied"),
|
|
("CAMERA_CALIBRATION_PATH", "/etc/gps-denied/calib.yml"),
|
|
("LOG_LEVEL", "INFO"),
|
|
("LOG_SINK", "console"),
|
|
("INFERENCE_BACKEND", "pytorch_fp16"),
|
|
("FDR_PATH", "/var/lib/gps-denied/fdr"),
|
|
("TILE_CACHE_PATH", "/var/lib/gps-denied/tiles"),
|
|
("MAVLINK_SIGNING_KEY", "ZZZZZZZZ"),
|
|
):
|
|
monkeypatch.setenv(name, value)
|
|
|
|
|
|
@pytest.fixture
|
|
def _operator_env(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
for name, value in (
|
|
("GPS_DENIED_FC_PROFILE", "ardupilot_plane"),
|
|
("GPS_DENIED_TIER", "1"),
|
|
("DB_URL", "postgresql+psycopg://gps_denied:dev@db:5432/gps_denied"),
|
|
("CAMERA_CALIBRATION_PATH", "/etc/gps-denied/calib.yml"),
|
|
("LOG_LEVEL", "INFO"),
|
|
("LOG_SINK", "console"),
|
|
("INFERENCE_BACKEND", "pytorch_fp16"),
|
|
("FDR_PATH", "/var/lib/gps-denied/fdr"),
|
|
("TILE_CACHE_PATH", "/var/lib/gps-denied/tiles"),
|
|
("SATELLITE_PROVIDER_URL", "http://localhost:8080"),
|
|
):
|
|
monkeypatch.setenv(name, value)
|
|
|
|
|
|
def _register_okvis2(recorder: _OrderRecorder) -> None:
|
|
def factory(config: Config, components: dict[str, object]) -> object:
|
|
recorder.constructed.append("c1_vio")
|
|
return ("c1_vio", "okvis2")
|
|
|
|
register_strategy("c1_vio", "okvis2", factory, tier="airborne")
|
|
|
|
|
|
def _register_c4(recorder: _OrderRecorder) -> None:
|
|
def factory(config: Config, components: dict[str, object]) -> object:
|
|
recorder.constructed.append("c4_pose")
|
|
return ("c4_pose", "opencv_gtsam")
|
|
|
|
register_strategy("c4_pose", "opencv_gtsam", factory, tier="airborne")
|
|
|
|
|
|
def _register_c5(recorder: _OrderRecorder) -> None:
|
|
def factory(config: Config, components: dict[str, object]) -> object:
|
|
assert "c1_vio" in components, "c5_state factory ran before c1_vio existed"
|
|
assert "c4_pose" in components, "c5_state factory ran before c4_pose existed"
|
|
recorder.constructed.append("c5_state")
|
|
return ("c5_state", "gtsam_isam2")
|
|
|
|
register_strategy(
|
|
"c5_state",
|
|
"gtsam_isam2",
|
|
factory,
|
|
tier="airborne",
|
|
depends_on=("c1_vio", "c4_pose"),
|
|
)
|
|
|
|
|
|
def test_ac1_default_deployment_composes(_airborne_env: None) -> None:
|
|
recorder = _OrderRecorder(constructed=[])
|
|
_register_okvis2(recorder)
|
|
_register_c4(recorder)
|
|
_register_c5(recorder)
|
|
config = Config.with_blocks(
|
|
c1_vio=_C1Block(),
|
|
c4_pose=_C4Block(),
|
|
c5_state=_C5Block(),
|
|
)
|
|
root = compose_root(config)
|
|
assert isinstance(root, RuntimeRoot)
|
|
assert root.binary == "airborne"
|
|
assert set(root.components.keys()) == {"c1_vio", "c4_pose", "c5_state"}
|
|
|
|
|
|
def test_ac2_strategy_not_linked_raises_with_payload(_airborne_env: None) -> None:
|
|
# Only okvis2 is registered; config asks for vins_mono.
|
|
recorder = _OrderRecorder(constructed=[])
|
|
_register_okvis2(recorder)
|
|
config = Config.with_blocks(c1_vio=_C1Block(strategy="vins_mono"))
|
|
with pytest.raises(StrategyNotLinkedError) as info:
|
|
compose_root(config)
|
|
assert info.value.strategy_name == "vins_mono"
|
|
assert info.value.component_slug == "c1_vio"
|
|
assert info.value.available_strategies == ["okvis2"]
|
|
|
|
|
|
def test_ac3_operator_excludes_airborne_only(_operator_env: None) -> None:
|
|
# c1_vio is registered as airborne; an operator config that references it must fail.
|
|
recorder = _OrderRecorder(constructed=[])
|
|
_register_okvis2(recorder)
|
|
config = Config.with_blocks(c1_vio=_C1Block())
|
|
with pytest.raises(StrategyNotLinkedError) as info:
|
|
compose_operator(config)
|
|
assert info.value.component_slug == "c1_vio"
|
|
assert "airborne" in info.value.reason or "tier" in info.value.reason
|
|
|
|
|
|
def test_ac4_runtime_root_smoke_exit_zero(_airborne_env: None) -> None:
|
|
# A Config with no component blocks must compose cleanly (every required
|
|
# component is hard-wired by its bootstrap; no strategy to resolve).
|
|
config = Config()
|
|
root = compose_root(config)
|
|
assert isinstance(root, RuntimeRoot)
|
|
assert root.components == {}
|
|
|
|
|
|
def test_ac5_construction_order_respects_dependencies(_airborne_env: None) -> None:
|
|
recorder = _OrderRecorder(constructed=[])
|
|
# Register in reverse order to make the topological pass non-trivial.
|
|
_register_c5(recorder)
|
|
_register_c4(recorder)
|
|
_register_okvis2(recorder)
|
|
config = Config.with_blocks(
|
|
c1_vio=_C1Block(),
|
|
c4_pose=_C4Block(),
|
|
c5_state=_C5Block(),
|
|
)
|
|
root = compose_root(config)
|
|
# Dependencies must construct strictly before dependents.
|
|
assert recorder.constructed.index("c1_vio") < recorder.constructed.index("c5_state")
|
|
assert recorder.constructed.index("c4_pose") < recorder.constructed.index("c5_state")
|
|
assert root.construction_order[-1] == "c5_state"
|
|
|
|
|
|
def test_ac6_only_compose_root_imports_concrete_strategies() -> None:
|
|
"""Architecture lint: no module under ``components.*`` imports another component's concrete strategy.
|
|
|
|
We accept only:
|
|
* the composition root (``runtime_root.py``);
|
|
* per-component public re-exports inside the component's own subpackage
|
|
(e.g. ``components.c5_state`` importing ``components.c5_state.interface``).
|
|
Imports across components (e.g. ``components.c5_state`` importing
|
|
``components.c1_vio.okvis2``) are violations.
|
|
"""
|
|
components_root = _REPO_ROOT / "src" / "gps_denied_onboard" / "components"
|
|
violations: list[str] = []
|
|
for module_path in components_root.rglob("*.py"):
|
|
own_component = module_path.relative_to(components_root).parts[0]
|
|
try:
|
|
tree = ast.parse(module_path.read_text(encoding="utf-8"))
|
|
except SyntaxError:
|
|
continue
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.ImportFrom) and node.module:
|
|
if node.module.startswith("gps_denied_onboard.components."):
|
|
referenced = node.module.split(".")[2]
|
|
if referenced != own_component:
|
|
violations.append(
|
|
f"{module_path.relative_to(_REPO_ROOT)} imports {node.module}"
|
|
)
|
|
elif isinstance(node, ast.Import):
|
|
for alias in node.names:
|
|
if alias.name.startswith("gps_denied_onboard.components."):
|
|
referenced = alias.name.split(".")[2]
|
|
if referenced != own_component:
|
|
violations.append(
|
|
f"{module_path.relative_to(_REPO_ROOT)} imports {alias.name}"
|
|
)
|
|
assert not violations, (
|
|
"components.* may not import other components — only the composition root may; "
|
|
f"violations: {violations}"
|
|
)
|
|
|
|
|
|
def test_nfr_reliability_partial_construction_closed_on_failure(_airborne_env: None) -> None:
|
|
closed: list[str] = []
|
|
|
|
class _Closable:
|
|
def __init__(self, slug: str) -> None:
|
|
self.slug = slug
|
|
|
|
def close(self) -> None:
|
|
closed.append(self.slug)
|
|
|
|
def good_factory(config: Config, components: dict[str, object]) -> _Closable:
|
|
return _Closable("c1_vio")
|
|
|
|
def failing_factory(config: Config, components: dict[str, object]) -> object:
|
|
raise RuntimeError("boom from c5_state factory")
|
|
|
|
register_strategy("c1_vio", "okvis2", good_factory, tier="airborne")
|
|
register_strategy(
|
|
"c5_state",
|
|
"gtsam_isam2",
|
|
failing_factory,
|
|
tier="airborne",
|
|
depends_on=("c1_vio",),
|
|
)
|
|
config = Config.with_blocks(c1_vio=_C1Block(), c5_state=_C5Block())
|
|
with pytest.raises(RuntimeError, match=r"boom"):
|
|
compose_root(config)
|
|
assert closed == ["c1_vio"], "prior instances must be .close()d on mid-composition failure"
|
|
|
|
|
|
def test_list_registered_strategies_returns_sorted_names() -> None:
|
|
register_strategy("c1_vio", "okvis2", lambda c, m: None, tier="airborne")
|
|
register_strategy("c1_vio", "vins_mono", lambda c, m: None, tier="airborne")
|
|
register_strategy("c2_vpr", "netvlad", lambda c, m: None, tier="airborne")
|
|
assert list_registered_strategies("c1_vio") == ["okvis2", "vins_mono"]
|
|
assert list_registered_strategies("c2_vpr") == ["netvlad"]
|
|
assert list_registered_strategies("c99_unknown") == []
|