Files
gps-denied-onboard/tests/unit/test_az283_descriptor_normaliser.py
T
Oleksandr Bezdieniezhnykh af0dbe863a [AZ-338] [AZ-283] C2 NetVLAD mandatory simple-baseline VprStrategy
NetVLAD is the C2 comparative baseline per the engine rule (every
production-default backbone ships with a simple-baseline alongside).
Runs on the C7 PyTorch FP16 runtime (NOT TRT) so a TRT engine compile
bug cannot simultaneously break NetVLAD AND UltraVPR.

Production changes:
- c2_vpr/net_vlad.py — NetVladStrategy + module-level create() factory.
  Constructor wires InferenceRuntimeCut + DescriptorIndexCut +
  NetVladBackbonePreprocessor + DescriptorNormaliser + FaissBridge.
  embed_query pipeline: preprocess -> runtime.infer -> dual-stage
  normalisation (intra-cluster THEN global L2) -> VprQuery.
  retrieve_topk delegates one-line to FaissBridge.
- c2_vpr/_net_vlad_architecture.py — Arandjelovic et al. 2016 NetVLAD
  layer over torchvision VGG16 features + optional Linear PCA
  projection to descriptor_dim (default 4096; published Pittsburgh
  reference uses K*D=64*512=32768 raw + Linear(32768, 4096) PCA).
- c2_vpr/_preprocessor_net_vlad.py — OpenCV-based image preprocessor:
  decode -> centre-crop square -> resize (480, 480) -> ImageNet
  normalisation -> FP16 NCHW. Calibration is not consumed (NetVLAD
  is calibration-agnostic per published preprocessing chain).
- c2_vpr/inference_runtime_cut.py — NEW AZ-507 consumer-side cut
  mirroring C7 InferenceRuntime; lets c2_vpr stay AZ-507-clean.
- c2_vpr/config.py — added netvlad_descriptor_dim: int = 4096 knob.
- helpers/descriptor_normaliser.py — added intra_cluster_normalise
  (DescriptorNormaliser v1.0.0 -> v1.1.0; backward-compatible add).
- runtime_root/vpr_factory.py — added _register_strategy_architecture
  helper that binds (MODEL_NAME, architecture_factory(descriptor_dim))
  to C7's architecture registry before delegating to the strategy's
  create() factory. Keeps the c7 import at L4, preserves AZ-507.
- fdr_client/records.py — registered vpr.embed_query,
  vpr.backbone_error, vpr.preprocess_error record kinds.

Tests:
- tests/unit/c2_vpr/test_net_vlad.py — 31 tests covering all 11 ACs +
  preprocessor contract + architecture factory + constructor
  validation + FDR record emission.
- tests/unit/test_az283_descriptor_normaliser.py — +8 tests for the
  new intra_cluster_normalise.
- tests/unit/test_az272_fdr_record_schema.py — +3 fixture payloads.

Full unit suite: 1608 passed / 80 env-skipped (+43 new tests).
Per-batch code review (batch_46_review.md): PASS_WITH_WARNINGS
(4 Low-severity hygiene findings; no Critical/High/Medium).

Architectural notes:
- The spec implied c2_vpr.net_vlad.create() registers the architecture
  with C7. That violates AZ-507 (no cross-component imports). Resolved
  by exposing MODEL_NAME + architecture_factory(descriptor_dim) on the
  strategy module and having the composition root perform the C7 bind.
- C7 PyTorch runtime API names in the spec (forward, load_engine)
  were outdated; aligned implementation with the live v1.0.0 Protocol
  (infer, compile_engine + deserialize_engine). Spec hygiene flagged
  in review F2.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-13 22:30:29 +03:00

200 lines
7.7 KiB
Python

"""AZ-283 — DescriptorNormaliser helper AC tests.
Verifies the contract at ``_docs/02_document/contracts/shared_helpers/descriptor_normaliser.md`` v1.0.0.
"""
from __future__ import annotations
import ast
from pathlib import Path
import numpy as np
import pytest
from gps_denied_onboard.helpers import DescriptorNormaliser, DescriptorNormaliserError
def test_ac1_unit_vector_example() -> None:
out = DescriptorNormaliser.l2_normalise(np.array([3.0, 4.0], dtype=np.float32))
np.testing.assert_allclose(out, np.array([0.6, 0.8], dtype=np.float32), atol=1e-6)
assert float(np.linalg.norm(out)) == pytest.approx(1.0, abs=1e-6)
def test_ac2_batch_normalisation() -> None:
batch = np.array([[3.0, 4.0], [1.0, 0.0]], dtype=np.float32)
out = DescriptorNormaliser.l2_normalise_batch(batch)
np.testing.assert_allclose(out[0], np.array([0.6, 0.8], dtype=np.float32), atol=1e-6)
np.testing.assert_allclose(out[1], np.array([1.0, 0.0], dtype=np.float32), atol=1e-6)
for row in out:
assert float(np.linalg.norm(row)) == pytest.approx(1.0, abs=1e-6)
def test_ac3_fp16_dtype_preservation() -> None:
rng = np.random.default_rng(2026)
x = rng.standard_normal(512).astype(np.float16)
out = DescriptorNormaliser.l2_normalise(x)
assert out.dtype == np.float16
assert float(np.linalg.norm(out.astype(np.float32))) == pytest.approx(1.0, abs=1e-3)
def test_ac4_fp32_dtype_preservation() -> None:
rng = np.random.default_rng(2026)
x = rng.standard_normal(512).astype(np.float32)
out = DescriptorNormaliser.l2_normalise(x)
assert out.dtype == np.float32
assert float(np.linalg.norm(out)) == pytest.approx(1.0, abs=1e-6)
def test_ac5_zero_vector_handling() -> None:
zeros = np.zeros(128, dtype=np.float32)
out = DescriptorNormaliser.l2_normalise(zeros)
np.testing.assert_array_equal(out, zeros)
assert not np.any(np.isnan(out))
def test_ac5b_zero_row_in_batch_remains_zero() -> None:
batch = np.array([[0.0, 0.0, 0.0], [1.0, 0.0, 0.0]], dtype=np.float32)
out = DescriptorNormaliser.l2_normalise_batch(batch)
np.testing.assert_array_equal(out[0], np.zeros(3, dtype=np.float32))
np.testing.assert_allclose(out[1], np.array([1.0, 0.0, 0.0], dtype=np.float32))
def test_ac6_idempotence_fp32() -> None:
rng = np.random.default_rng(2026)
x = rng.standard_normal(64).astype(np.float32)
once = DescriptorNormaliser.l2_normalise(x)
twice = DescriptorNormaliser.l2_normalise(once)
assert once.tobytes() == twice.tobytes()
def test_ac7_idempotence_fp16_within_half_precision_tol() -> None:
rng = np.random.default_rng(2026)
x = rng.standard_normal(64).astype(np.float16)
once = DescriptorNormaliser.l2_normalise(x)
twice = DescriptorNormaliser.l2_normalise(once)
np.testing.assert_allclose(twice.astype(np.float32), once.astype(np.float32), atol=1e-3)
def test_ac8_no_in_place_mutation() -> None:
x = np.array([3.0, 4.0, 0.0], dtype=np.float32)
snapshot = x.copy()
_ = DescriptorNormaliser.l2_normalise(x)
np.testing.assert_array_equal(x, snapshot)
def test_ac9_metric_is_inner_product_exact_string() -> None:
assert DescriptorNormaliser.descriptor_metric() == "inner_product"
def test_ac10_float64_dtype_rejected() -> None:
with pytest.raises(DescriptorNormaliserError, match=r"float16.*float32|float32.*float16"):
DescriptorNormaliser.l2_normalise(np.array([1.0, 2.0], dtype=np.float64))
def test_ac11_shape_contract_single_rejects_2d() -> None:
with pytest.raises(DescriptorNormaliserError, match=r"1-D"):
DescriptorNormaliser.l2_normalise(np.zeros((2, 3), dtype=np.float32))
def test_ac11_shape_contract_batch_rejects_1d() -> None:
with pytest.raises(DescriptorNormaliserError, match=r"2-D"):
DescriptorNormaliser.l2_normalise_batch(np.zeros(128, dtype=np.float32))
def test_ac12_no_upward_imports_to_components() -> None:
module_path = (
Path(__file__).resolve().parents[2]
/ "src"
/ "gps_denied_onboard"
/ "helpers"
/ "descriptor_normaliser.py"
)
tree = ast.parse(module_path.read_text(encoding="utf-8"))
bad: list[str] = []
for node in ast.walk(tree):
if isinstance(node, ast.ImportFrom) and node.module:
if node.module.startswith("gps_denied_onboard.components"):
bad.append(node.module)
elif isinstance(node, ast.Import):
for alias in node.names:
if alias.name.startswith("gps_denied_onboard.components"):
bad.append(alias.name)
assert not bad, f"descriptor_normaliser must not import components.*; found: {bad}"
# AZ-338 — DescriptorNormaliser v1.1.0: intra_cluster_normalise
def test_intra_cluster_normalise_per_cluster_unit_norm() -> None:
# Arrange — 4 clusters of dim 3, residuals not yet normalised
raw = np.array(
[3.0, 4.0, 0.0, 1.0, 0.0, 0.0, 0.0, 5.0, 12.0, 2.0, 2.0, 1.0],
dtype=np.float32,
)
# Act
out = DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=4)
# Assert — each cluster sub-vector is unit norm in the intra-cluster sense
reshaped = out.reshape(4, 3)
for k in range(4):
assert float(np.linalg.norm(reshaped[k])) == pytest.approx(1.0, abs=1e-6)
def test_intra_cluster_normalise_dtype_preserved_fp16() -> None:
rng = np.random.default_rng(2026)
descriptor = rng.standard_normal(64 * 32).astype(np.float16)
out = DescriptorNormaliser.intra_cluster_normalise(descriptor, num_clusters=64)
assert out.dtype == np.float16
reshaped = out.astype(np.float32).reshape(64, 32)
for k in range(64):
assert float(np.linalg.norm(reshaped[k])) == pytest.approx(1.0, abs=1e-3)
def test_intra_cluster_normalise_zero_cluster_returns_zero() -> None:
# 2 clusters of dim 3; first is all zeros, second is unit-normalisable
raw = np.array([0.0, 0.0, 0.0, 0.0, 3.0, 4.0], dtype=np.float32)
out = DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=2)
np.testing.assert_array_equal(out[:3], np.zeros(3, dtype=np.float32))
np.testing.assert_allclose(
out[3:], np.array([0.0, 0.6, 0.8], dtype=np.float32), atol=1e-6
)
def test_intra_cluster_normalise_rejects_non_divisible_length() -> None:
raw = np.zeros(7, dtype=np.float32)
with pytest.raises(DescriptorNormaliserError, match=r"not divisible"):
DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=3)
def test_intra_cluster_normalise_rejects_2d_input() -> None:
raw = np.zeros((4, 3), dtype=np.float32)
with pytest.raises(DescriptorNormaliserError, match=r"1-D"):
DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=4)
def test_intra_cluster_normalise_rejects_zero_num_clusters() -> None:
raw = np.zeros(12, dtype=np.float32)
with pytest.raises(DescriptorNormaliserError, match=r">= 1"):
DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=0)
def test_intra_cluster_normalise_rejects_bool_num_clusters() -> None:
raw = np.zeros(12, dtype=np.float32)
with pytest.raises(DescriptorNormaliserError, match=r"non-bool"):
DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=True) # type: ignore[arg-type]
def test_intra_cluster_normalise_rejects_float64() -> None:
raw = np.zeros(12, dtype=np.float64)
with pytest.raises(DescriptorNormaliserError, match=r"float16.*float32|float32.*float16"):
DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=4)
def test_intra_cluster_normalise_no_in_place_mutation() -> None:
raw = np.array(
[3.0, 4.0, 0.0, 1.0, 0.0, 0.0, 0.0, 5.0, 12.0, 2.0, 2.0, 1.0],
dtype=np.float32,
)
snapshot = raw.copy()
_ = DescriptorNormaliser.intra_cluster_normalise(raw, num_clusters=4)
np.testing.assert_array_equal(raw, snapshot)