mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 21:41:12 +00:00
235eb4549e
Closes cumulative review batches 49-51 Finding F1 (Medium / Maintainability) -- the 7-way duplication of _assert_engine_output_dim across c2_vpr secondary VPR strategy modules. Add c2-internal helper assert_engine_output_dim(inference_runtime, handle, preprocessor, descriptor_dim, *, output_key='embedding', input_key='input') in src/gps_denied_onboard/components/c2_vpr/ _engine_dim_assertion.py. The helper runs a zero-init dry-run inference at preprocessor.input_shape() and asserts the engine output dict carries (1, descriptor_dim) under output_key. Raises gps_denied_onboard.config.schema.ConfigError on mismatch (preserving the prior error envelope and message wording byte-identically). Migrate 7 strategy modules (ultra_vpr, net_vlad, mega_loc, mix_vpr, sela_vpr, eigen_places, salad) to import the helper and delete the local _assert_engine_output_dim definitions + their inline 'AZ-527 (planned)' comments. NetVLAD is the only call site that overrides output_key='vlad_descriptor'; the other 6 explicitly pass output_key=_OUTPUT_KEY + input_key=_ENGINE_INPUT_KEY (matching helper defaults but documenting strategy contract at the call site). Add tests/unit/c2_vpr/test_az527_engine_dim_assertion.py (14 tests, AAA pattern, Protocol-conforming fakes) covering AC-1..AC-4: helper signature; wrong shape raises ConfigError naming both dims; missing output key raises ConfigError naming the missing key; AST-walk regression guard for stray definitions outside the helper module (modeled on AZ-526's test_ac4_az526_no_module_level_iso_ts_from_clock_outside_helper); import-grep regression guard verifying all 7 strategy modules import the helper. AC-5 (existing AZ-337/338/339/340 AC-6 sub-tests pass unmodified) is exercised transitively: c2_vpr/ full directory 230/230 PASS, no test file modified outside the new test_az527_*. AC-6 (AZ-270 + AZ-507 layer lints) verified by tests/unit/test_az270_compose_root.py 8/8 PASS. Code-review verdict: PASS (zero findings). Ruff clean. Co-authored-by: Cursor <cursoragent@cursor.com>
348 lines
11 KiB
Python
348 lines
11 KiB
Python
"""AZ-527 — c2_vpr engine output-dim assertion helper unit tests.
|
|
|
|
Covers AC-1..AC-6 of AZ-527. AC-5 is exercised transitively by the
|
|
existing AZ-337 / 338 / 339 / 340 AC-6 sub-tests (which run the
|
|
strategy ``create()`` end-to-end and now route through
|
|
``assert_engine_output_dim`` after this batch). AC-6 (layer
|
|
compliance) is exercised by ``test_az270_compose_root.py`` — this
|
|
file additionally adds a focused regression guard (AC-4) that walks
|
|
``src/`` for stray ``_assert_engine_output_dim`` /
|
|
``assert_engine_output_dim`` definitions outside the helper module.
|
|
|
|
The unit tests below use minimal fakes (Protocol-conforming) so the
|
|
helper can be exercised without spinning up a full strategy
|
|
``create()`` chain.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ast
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Literal
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from gps_denied_onboard._types.inference import (
|
|
BuildConfig,
|
|
EngineCacheEntry,
|
|
EngineHandle,
|
|
)
|
|
from gps_denied_onboard.components.c2_vpr._engine_dim_assertion import (
|
|
assert_engine_output_dim,
|
|
)
|
|
from gps_denied_onboard.config.schema import ConfigError
|
|
|
|
_REPO_ROOT = Path(__file__).resolve().parents[3]
|
|
_HELPER_PATH = (
|
|
_REPO_ROOT
|
|
/ "src"
|
|
/ "gps_denied_onboard"
|
|
/ "components"
|
|
/ "c2_vpr"
|
|
/ "_engine_dim_assertion.py"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fakes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class _FakePreprocessor:
|
|
"""Minimal :class:`BackbonePreprocessor` for the helper's input_shape() probe."""
|
|
|
|
hw: tuple[int, int] = (224, 224)
|
|
|
|
def preprocess(
|
|
self, frame: Any, calibration: Any
|
|
) -> np.ndarray: # pragma: no cover - unused by the helper
|
|
raise AssertionError("helper must not call preprocess()")
|
|
|
|
def input_shape(self) -> tuple[int, int]:
|
|
return self.hw
|
|
|
|
|
|
class _FakeRuntime:
|
|
"""Minimal :class:`InferenceRuntimeCut` returning a fixed output dict."""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
output: dict[str, np.ndarray],
|
|
expected_input_key: str = "input",
|
|
expected_input_hw: tuple[int, int] = (224, 224),
|
|
) -> None:
|
|
self._output = output
|
|
self._expected_input_key = expected_input_key
|
|
self._expected_input_hw = expected_input_hw
|
|
self.last_input_key: str | None = None
|
|
self.last_input_shape: tuple[int, ...] | None = None
|
|
self.last_input_dtype: np.dtype | None = None
|
|
self.infer_call_count: int = 0
|
|
|
|
def compile_engine(
|
|
self, model_path: Path, build_config: BuildConfig
|
|
) -> EngineCacheEntry: # pragma: no cover - unused
|
|
raise AssertionError("helper must not call compile_engine()")
|
|
|
|
def deserialize_engine(
|
|
self, entry: EngineCacheEntry
|
|
) -> EngineHandle: # pragma: no cover - unused
|
|
raise AssertionError("helper must not call deserialize_engine()")
|
|
|
|
def infer(
|
|
self,
|
|
handle: EngineHandle,
|
|
inputs: dict[str, np.ndarray],
|
|
) -> dict[str, np.ndarray]:
|
|
self.infer_call_count += 1
|
|
assert len(inputs) == 1, f"expected one input key, got {sorted(inputs)!r}"
|
|
(key,) = inputs.keys()
|
|
self.last_input_key = key
|
|
tensor = inputs[key]
|
|
self.last_input_shape = tensor.shape
|
|
self.last_input_dtype = tensor.dtype
|
|
return self._output
|
|
|
|
def release_engine(self, handle: EngineHandle) -> None: # pragma: no cover
|
|
raise AssertionError("helper must not call release_engine()")
|
|
|
|
def current_runtime_label(
|
|
self,
|
|
) -> Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"]: # pragma: no cover
|
|
raise AssertionError("helper must not call current_runtime_label()")
|
|
|
|
|
|
_DUMMY_HANDLE: EngineHandle = "dummy-handle" # type: ignore[assignment]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-1: Helper exists at the canonical path with the expected signature
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_ac1_helper_callable_with_default_keys() -> None:
|
|
# Arrange
|
|
runtime = _FakeRuntime(
|
|
output={"embedding": np.zeros((1, 512), dtype=np.float16)}
|
|
)
|
|
preprocessor = _FakePreprocessor(hw=(224, 224))
|
|
|
|
# Act
|
|
result = assert_engine_output_dim(
|
|
runtime, _DUMMY_HANDLE, preprocessor, descriptor_dim=512
|
|
)
|
|
|
|
# Assert
|
|
assert result is None
|
|
assert runtime.infer_call_count == 1
|
|
assert runtime.last_input_key == "input"
|
|
assert runtime.last_input_shape == (1, 3, 224, 224)
|
|
assert runtime.last_input_dtype == np.dtype(np.float16)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-2: Wrong shape raises ConfigError naming both expected and actual dims
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"actual_shape,expected_dim",
|
|
[
|
|
((1, 999), 512),
|
|
((1, 4096), 2048),
|
|
((1, 100), 8448),
|
|
],
|
|
)
|
|
def test_ac2_wrong_descriptor_width_raises_config_error(
|
|
actual_shape: tuple[int, int], expected_dim: int
|
|
) -> None:
|
|
# Arrange
|
|
runtime = _FakeRuntime(
|
|
output={"embedding": np.zeros(actual_shape, dtype=np.float16)}
|
|
)
|
|
preprocessor = _FakePreprocessor(hw=(224, 224))
|
|
|
|
# Act + Assert
|
|
with pytest.raises(ConfigError) as exc_info:
|
|
assert_engine_output_dim(
|
|
runtime, _DUMMY_HANDLE, preprocessor, descriptor_dim=expected_dim
|
|
)
|
|
msg = str(exc_info.value)
|
|
assert f"expected (1, {expected_dim})" in msg
|
|
assert f"got {actual_shape!r}".replace("'", "") in msg or str(actual_shape) in msg
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"actual_shape", [(512,), (2, 1, 512), (1, 1, 512), (1, 512, 1)]
|
|
)
|
|
def test_ac2_wrong_ndim_or_batch_raises_config_error(
|
|
actual_shape: tuple[int, ...],
|
|
) -> None:
|
|
# Arrange
|
|
runtime = _FakeRuntime(
|
|
output={"embedding": np.zeros(actual_shape, dtype=np.float16)}
|
|
)
|
|
preprocessor = _FakePreprocessor()
|
|
|
|
# Act + Assert
|
|
with pytest.raises(ConfigError, match=r"engine output shape mismatch"):
|
|
assert_engine_output_dim(
|
|
runtime, _DUMMY_HANDLE, preprocessor, descriptor_dim=512
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-3: Missing output key raises ConfigError naming the missing key
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_ac3_missing_default_output_key_raises_config_error() -> None:
|
|
# Arrange
|
|
runtime = _FakeRuntime(
|
|
output={"wrong_key": np.zeros((1, 512), dtype=np.float16)}
|
|
)
|
|
preprocessor = _FakePreprocessor()
|
|
|
|
# Act + Assert
|
|
with pytest.raises(ConfigError) as exc_info:
|
|
assert_engine_output_dim(
|
|
runtime, _DUMMY_HANDLE, preprocessor, descriptor_dim=512
|
|
)
|
|
msg = str(exc_info.value)
|
|
assert "'embedding' key absent" in msg
|
|
assert "wrong_key" in msg
|
|
|
|
|
|
def test_ac3_missing_overridden_output_key_raises_config_error() -> None:
|
|
# Arrange — NetVLAD-style override
|
|
runtime = _FakeRuntime(
|
|
output={"embedding": np.zeros((1, 4096), dtype=np.float16)}
|
|
)
|
|
preprocessor = _FakePreprocessor(hw=(480, 640))
|
|
|
|
# Act + Assert
|
|
with pytest.raises(ConfigError, match=r"'vlad_descriptor' key absent"):
|
|
assert_engine_output_dim(
|
|
runtime,
|
|
_DUMMY_HANDLE,
|
|
preprocessor,
|
|
descriptor_dim=4096,
|
|
output_key="vlad_descriptor",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC: Non-default keys (covers NetVLAD's overrides + future strategies)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_helper_accepts_non_default_output_and_input_keys() -> None:
|
|
# Arrange — NetVLAD-style: vlad_descriptor + custom input key
|
|
runtime = _FakeRuntime(
|
|
output={"vlad_descriptor": np.zeros((1, 4096), dtype=np.float16)}
|
|
)
|
|
preprocessor = _FakePreprocessor(hw=(480, 640))
|
|
|
|
# Act
|
|
assert_engine_output_dim(
|
|
runtime,
|
|
_DUMMY_HANDLE,
|
|
preprocessor,
|
|
descriptor_dim=4096,
|
|
output_key="vlad_descriptor",
|
|
input_key="image",
|
|
)
|
|
|
|
# Assert
|
|
assert runtime.last_input_key == "image"
|
|
assert runtime.last_input_shape == (1, 3, 480, 640)
|
|
|
|
|
|
def test_helper_propagates_preprocessor_input_shape_to_probe_tensor() -> None:
|
|
# Arrange — SALAD-style 322x322 + non-square shape coverage
|
|
for hw in [(224, 224), (322, 322), (480, 480), (480, 640)]:
|
|
runtime = _FakeRuntime(
|
|
output={"embedding": np.zeros((1, 8448), dtype=np.float16)}
|
|
)
|
|
preprocessor = _FakePreprocessor(hw=hw)
|
|
|
|
# Act
|
|
assert_engine_output_dim(
|
|
runtime, _DUMMY_HANDLE, preprocessor, descriptor_dim=8448
|
|
)
|
|
|
|
# Assert
|
|
assert runtime.last_input_shape == (1, 3, hw[0], hw[1]), hw
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-4: Regression guard — no stray definitions outside the helper module
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_ac4_no_stray_engine_dim_assertion_definitions_outside_helper() -> None:
|
|
"""AC-4 (AZ-527): a `def assert_engine_output_dim` /
|
|
`def _assert_engine_output_dim` MUST exist only inside
|
|
`_engine_dim_assertion.py`. Any other definition under `src/`
|
|
means a c2_vpr strategy author slipped a copy back in.
|
|
|
|
Modeled on the AZ-508 / AZ-526 `test_no_local_iso_ts_*_definitions_remain`
|
|
pattern.
|
|
"""
|
|
# Arrange
|
|
src_root = _REPO_ROOT / "src"
|
|
offenders: list[tuple[Path, str]] = []
|
|
target_names = {"assert_engine_output_dim", "_assert_engine_output_dim"}
|
|
|
|
# Act
|
|
for path in src_root.rglob("*.py"):
|
|
if path == _HELPER_PATH:
|
|
continue
|
|
try:
|
|
tree = ast.parse(path.read_text(encoding="utf-8"))
|
|
except SyntaxError:
|
|
continue
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.FunctionDef) and node.name in target_names:
|
|
offenders.append((path.relative_to(_REPO_ROOT), node.name))
|
|
|
|
# Assert
|
|
assert offenders == [], (
|
|
"Found stray `assert_engine_output_dim` definitions outside the "
|
|
f"c2_vpr helper module: {offenders}"
|
|
)
|
|
|
|
|
|
def test_ac4_seven_strategy_modules_import_the_helper() -> None:
|
|
"""The 7 migrated strategy modules must import from the helper so
|
|
future hygiene cycles can rely on a single source of truth.
|
|
"""
|
|
# Arrange
|
|
c2_dir = _REPO_ROOT / "src" / "gps_denied_onboard" / "components" / "c2_vpr"
|
|
strategy_files = [
|
|
c2_dir / "ultra_vpr.py",
|
|
c2_dir / "net_vlad.py",
|
|
c2_dir / "mega_loc.py",
|
|
c2_dir / "mix_vpr.py",
|
|
c2_dir / "sela_vpr.py",
|
|
c2_dir / "eigen_places.py",
|
|
c2_dir / "salad.py",
|
|
]
|
|
expected_module = "gps_denied_onboard.components.c2_vpr._engine_dim_assertion"
|
|
missing: list[Path] = []
|
|
|
|
# Act
|
|
for path in strategy_files:
|
|
text = path.read_text(encoding="utf-8")
|
|
if expected_module not in text or "assert_engine_output_dim" not in text:
|
|
missing.append(path.relative_to(_REPO_ROOT))
|
|
|
|
# Assert
|
|
assert missing == [], (
|
|
f"Strategy modules missing the helper import: {missing}"
|
|
)
|