mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 10:31:13 +00:00
9a605c8514
Defines the public `ConditionalRefiner` Protocol (PEP 544 @runtime_checkable, two methods: `refine_if_needed` + `was_invoked`), extends `MatchResult` in-place with two default-valued refinement fields (`refinement_label`, `refinement_added_latency_ms`), defines the `RefinerError` family (`RefinerBackboneError`, `RefinerConfigError`), and ships the trivial `PassthroughRefiner` reference impl. Both refiner strategies are linked unconditionally — no `BUILD_REFINER_*` flag (NOT ADR-002 territory). Runtime selection only per ADR-001. `PassthroughRefiner` returns the input `MatchResult` by reference (bit-identical correspondences per contract INV-5) and always reports `was_invoked() is False`. Documentation: renames `module-layout.md` `c3_5_adhop` Public API symbol from `AdHoPRefinementStrategy` to `ConditionalRefiner` (AC-14) so the doc agrees with `description.md` and the contract. AC-9 (single-thread binding) deferred to AZ-270 runtime-root composition, mirroring AZ-336 / AZ-342 / AZ-344 Risk-4 precedent. AC-7 for the `"adhop"` strategy stops at `ModuleNotFoundError` because the AdHoP backbone is owned by AZ-349. All other ACs + NFRs covered by 36 new conformance tests. Architectural note: `PassthroughRefiner.inference_runtime` is typed as `object` because the L3→L3 import ban (`test_az270_compose_root`) forbids c3_5_adhop from importing c7_inference; the runtime-root factory narrows the type at construction time. Co-authored-by: Cursor <cursoragent@cursor.com>
491 lines
16 KiB
Python
491 lines
16 KiB
Python
"""AZ-348 — C3.5 ConditionalRefiner Protocol + DTO + error + factory conformance.
|
|
|
|
Covers AC-1..AC-14 + NFRs. AC-9 (single-thread binding) is
|
|
deferred per the task spec's Risk-4 escape clause — the generic
|
|
``compose_root`` thread-binding registry lives with AZ-270 and
|
|
the broader runtime-root composition. Each factory owns its own
|
|
thread binding today; this protocol task does not add a new
|
|
binding registry.
|
|
|
|
AC-7 (strategy resolution table) is asserted up to the lookup
|
|
point for ``"adhop"``: the AdHoP module is the AZ-349 placeholder
|
|
that does not exist yet, so the assertion on that path stops at
|
|
``ModuleNotFoundError`` — explicitly documented as the "import +
|
|
class lookup not __init__" rule in the task spec.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import dataclasses
|
|
import logging
|
|
import sys
|
|
import time
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from gps_denied_onboard._types.matcher import (
|
|
CandidateMatchSet,
|
|
MatchResult,
|
|
)
|
|
from gps_denied_onboard.components.c3_5_adhop import (
|
|
C3_5RefinerConfig,
|
|
ConditionalRefiner,
|
|
)
|
|
from gps_denied_onboard.components.c3_5_adhop.config import KNOWN_STRATEGIES
|
|
from gps_denied_onboard.components.c3_5_adhop.errors import (
|
|
RefinerBackboneError,
|
|
RefinerConfigError,
|
|
RefinerError,
|
|
)
|
|
from gps_denied_onboard.components.c3_5_adhop.passthrough_refiner import (
|
|
PassthroughRefiner,
|
|
create,
|
|
)
|
|
from gps_denied_onboard.config.schema import Config, ConfigError
|
|
from gps_denied_onboard.runtime_root.refiner_factory import build_refiner_strategy
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Fakes.
|
|
|
|
|
|
class _FakeRansacFilter:
|
|
def filter(self, *args, **kwargs):
|
|
raise NotImplementedError
|
|
|
|
|
|
class _FakeInferenceRuntime:
|
|
def deserialize_engine(self, *args, **kwargs):
|
|
raise NotImplementedError
|
|
|
|
def thermal_state(self):
|
|
raise NotImplementedError
|
|
|
|
|
|
class _FakeFrame:
|
|
frame_id = 7
|
|
|
|
|
|
class _PartialRefinerMissingWasInvoked:
|
|
def refine_if_needed(self, frame, mr, residual_threshold_px):
|
|
raise NotImplementedError
|
|
|
|
|
|
class _PartialRefinerMissingRefine:
|
|
def was_invoked(self):
|
|
raise NotImplementedError
|
|
|
|
|
|
def _config_with_strategy(
|
|
strategy: str = "passthrough",
|
|
*,
|
|
threshold: float = 2.5,
|
|
) -> Config:
|
|
return Config.with_blocks(
|
|
c3_5_adhop=C3_5RefinerConfig(
|
|
strategy=strategy,
|
|
residual_threshold_px=threshold,
|
|
)
|
|
)
|
|
|
|
|
|
def _make_candidate(*, inliers: int = 40, residual: float = 1.0) -> CandidateMatchSet:
|
|
return CandidateMatchSet(
|
|
tile_id=(18, 49.9, 36.3),
|
|
inlier_count=inliers,
|
|
inlier_correspondences=np.ones((inliers, 4), dtype=np.float32) * 0.5,
|
|
ransac_outlier_count=3,
|
|
per_candidate_residual_px=residual,
|
|
)
|
|
|
|
|
|
def _make_result(
|
|
*,
|
|
reprojection_residual: float = 1.0,
|
|
refinement_label: str = "passthrough",
|
|
refinement_latency_ms: float = 0.0,
|
|
) -> MatchResult:
|
|
candidate = _make_candidate()
|
|
return MatchResult(
|
|
frame_id=7,
|
|
per_candidate=(candidate,),
|
|
best_candidate_idx=0,
|
|
reprojection_residual_px=reprojection_residual,
|
|
matched_at=1_000_000_000,
|
|
matcher_label="disk_lightglue",
|
|
candidates_input=3,
|
|
candidates_dropped=2,
|
|
refinement_label=refinement_label,
|
|
refinement_added_latency_ms=refinement_latency_ms,
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-1: Protocol conformance.
|
|
|
|
|
|
def test_ac1_passthrough_refiner_conformance() -> None:
|
|
refiner = PassthroughRefiner(
|
|
ransac_filter=_FakeRansacFilter(),
|
|
inference_runtime=_FakeInferenceRuntime(),
|
|
)
|
|
assert isinstance(refiner, ConditionalRefiner)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"partial_cls",
|
|
[_PartialRefinerMissingWasInvoked, _PartialRefinerMissingRefine],
|
|
)
|
|
def test_ac1_partial_refiners_fail_conformance(partial_cls) -> None:
|
|
assert not isinstance(partial_cls(), ConditionalRefiner)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-2: MatchResult backward-compatible defaults.
|
|
|
|
|
|
def test_ac2_match_result_construction_without_new_fields() -> None:
|
|
candidate = _make_candidate()
|
|
mr = MatchResult(
|
|
frame_id=7,
|
|
per_candidate=(candidate,),
|
|
best_candidate_idx=0,
|
|
reprojection_residual_px=candidate.per_candidate_residual_px,
|
|
matched_at=1_000_000_000,
|
|
matcher_label="disk_lightglue",
|
|
candidates_input=3,
|
|
candidates_dropped=2,
|
|
)
|
|
assert mr.refinement_label == "passthrough"
|
|
assert mr.refinement_added_latency_ms == 0.0
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-3: MatchResult immutability + slots preserved.
|
|
|
|
|
|
def test_ac3_match_result_slots_include_new_fields() -> None:
|
|
assert "refinement_label" in MatchResult.__slots__
|
|
assert "refinement_added_latency_ms" in MatchResult.__slots__
|
|
|
|
|
|
def test_ac3_match_result_remains_frozen() -> None:
|
|
mr = _make_result()
|
|
with pytest.raises(dataclasses.FrozenInstanceError):
|
|
mr.refinement_label = "adhop"
|
|
with pytest.raises(dataclasses.FrozenInstanceError):
|
|
mr.refinement_added_latency_ms = 12.5
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-4: Factory rejects unknown strategy. (Validated at config-load.)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"bad_label",
|
|
["ADHOP", "passthrough_v2", "garbage", "", "default"],
|
|
)
|
|
def test_ac4_unknown_strategy_rejected_at_config_load(bad_label: str) -> None:
|
|
with pytest.raises(ConfigError) as exc_info:
|
|
C3_5RefinerConfig(strategy=bad_label)
|
|
msg = str(exc_info.value)
|
|
for valid in KNOWN_STRATEGIES:
|
|
assert valid in msg
|
|
|
|
|
|
def test_ac4_factory_emits_error_log_on_unknown_strategy(caplog) -> None:
|
|
# Build a Config that bypasses __post_init__ validation by
|
|
# constructing the block via dataclasses.replace on an already-valid
|
|
# block — this is what catches the defensive factory path.
|
|
valid_block = C3_5RefinerConfig(strategy="passthrough")
|
|
object.__setattr__(valid_block, "strategy", "garbage")
|
|
config = Config.with_blocks(c3_5_adhop=valid_block)
|
|
with caplog.at_level(logging.ERROR, logger="gps_denied_onboard.c3_5_adhop"):
|
|
with pytest.raises(RefinerConfigError) as exc_info:
|
|
build_refiner_strategy(
|
|
config,
|
|
ransac_filter=_FakeRansacFilter(),
|
|
inference_runtime=_FakeInferenceRuntime(),
|
|
)
|
|
assert "Unknown refiner strategy: garbage" in str(exc_info.value)
|
|
assert any(
|
|
r.message == "c3_5.refiner.strategy_unknown" for r in caplog.records
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-5: Factory rejects invalid threshold.
|
|
|
|
|
|
@pytest.mark.parametrize("bad_threshold", [0.0, -0.1, -10.0])
|
|
def test_ac5_invalid_threshold_rejected_at_config_load(bad_threshold: float) -> None:
|
|
with pytest.raises(ConfigError):
|
|
C3_5RefinerConfig(residual_threshold_px=bad_threshold)
|
|
|
|
|
|
def test_ac5_factory_emits_error_log_on_invalid_threshold(caplog) -> None:
|
|
valid_block = C3_5RefinerConfig(strategy="passthrough")
|
|
object.__setattr__(valid_block, "residual_threshold_px", 0.0)
|
|
config = Config.with_blocks(c3_5_adhop=valid_block)
|
|
with caplog.at_level(logging.ERROR, logger="gps_denied_onboard.c3_5_adhop"):
|
|
with pytest.raises(RefinerConfigError):
|
|
build_refiner_strategy(
|
|
config,
|
|
ransac_filter=_FakeRansacFilter(),
|
|
inference_runtime=_FakeInferenceRuntime(),
|
|
)
|
|
assert any(
|
|
r.message == "c3_5.refiner.invalid_threshold" for r in caplog.records
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-6: Successful factory load emits INFO log.
|
|
|
|
|
|
def test_ac6_factory_emits_info_log_on_success(caplog) -> None:
|
|
config = _config_with_strategy("passthrough", threshold=2.5)
|
|
with caplog.at_level(logging.INFO, logger="gps_denied_onboard.c3_5_adhop"):
|
|
instance = build_refiner_strategy(
|
|
config,
|
|
ransac_filter=_FakeRansacFilter(),
|
|
inference_runtime=_FakeInferenceRuntime(),
|
|
)
|
|
assert isinstance(instance, ConditionalRefiner)
|
|
records = [
|
|
r for r in caplog.records if r.message == "c3_5.refiner.strategy_loaded"
|
|
]
|
|
assert len(records) == 1
|
|
record = records[0]
|
|
assert getattr(record, "strategy", None) == "passthrough"
|
|
assert getattr(record, "residual_threshold_px", None) == 2.5
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-7: Strategy resolution table.
|
|
|
|
|
|
def test_ac7_passthrough_resolution() -> None:
|
|
config = _config_with_strategy("passthrough")
|
|
instance = build_refiner_strategy(
|
|
config,
|
|
ransac_filter=_FakeRansacFilter(),
|
|
inference_runtime=_FakeInferenceRuntime(),
|
|
)
|
|
assert isinstance(instance, PassthroughRefiner)
|
|
assert isinstance(instance, ConditionalRefiner)
|
|
assert (
|
|
"gps_denied_onboard.components.c3_5_adhop.passthrough_refiner"
|
|
in sys.modules
|
|
)
|
|
|
|
|
|
def test_ac7_adhop_resolution_stops_at_module_lookup() -> None:
|
|
"""Task spec: the full-success path for "adhop" belongs to the
|
|
AdHoP task (AZ-349); this assertion verifies the factory reaches
|
|
the import step but the module does not exist yet.
|
|
"""
|
|
config = _config_with_strategy("adhop")
|
|
with pytest.raises(ModuleNotFoundError):
|
|
build_refiner_strategy(
|
|
config,
|
|
ransac_filter=_FakeRansacFilter(),
|
|
inference_runtime=_FakeInferenceRuntime(),
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-8: Public API.
|
|
|
|
|
|
def test_ac8_public_api_re_exports() -> None:
|
|
from gps_denied_onboard.components import c3_5_adhop
|
|
|
|
assert "ConditionalRefiner" in c3_5_adhop.__all__
|
|
assert "C3_5RefinerConfig" in c3_5_adhop.__all__
|
|
|
|
|
|
def test_ac8_internals_not_in_public_api() -> None:
|
|
from gps_denied_onboard.components import c3_5_adhop
|
|
|
|
for internal in (
|
|
"PassthroughRefiner",
|
|
"AdHoPRefiner",
|
|
"RefinerError",
|
|
"RefinerBackboneError",
|
|
"RefinerConfigError",
|
|
):
|
|
assert internal not in c3_5_adhop.__all__, (
|
|
f"internal name leaked into public API: {internal}"
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-9: deferred.
|
|
|
|
|
|
def test_ac9_single_thread_binding_deferred() -> None:
|
|
"""AC-9 (single-thread binding) is deferred per task spec
|
|
Risk 4: the generic ``compose_root`` thread-binding registry
|
|
lives with AZ-270 and the broader runtime-root composition.
|
|
Each factory owns its own thread binding today; this protocol
|
|
task does not add a new binding registry."""
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-10: PassthroughRefiner byte-identical correspondences.
|
|
|
|
|
|
def test_ac10_passthrough_returns_same_match_result_reference() -> None:
|
|
refiner = create(
|
|
_config_with_strategy(),
|
|
ransac_filter=_FakeRansacFilter(),
|
|
inference_runtime=_FakeInferenceRuntime(),
|
|
)
|
|
mr = _make_result()
|
|
out = refiner.refine_if_needed(_FakeFrame(), mr, residual_threshold_px=2.5)
|
|
assert out is mr
|
|
assert out.refinement_label == "passthrough"
|
|
assert out.refinement_added_latency_ms == 0.0
|
|
|
|
|
|
def test_ac10_passthrough_correspondences_bit_identical() -> None:
|
|
refiner = PassthroughRefiner(
|
|
ransac_filter=_FakeRansacFilter(),
|
|
inference_runtime=_FakeInferenceRuntime(),
|
|
)
|
|
mr = _make_result()
|
|
out = refiner.refine_if_needed(_FakeFrame(), mr, residual_threshold_px=2.5)
|
|
for in_cand, out_cand in zip(mr.per_candidate, out.per_candidate, strict=True):
|
|
assert np.array_equal(
|
|
out_cand.inlier_correspondences, in_cand.inlier_correspondences
|
|
)
|
|
assert out_cand.inlier_correspondences.dtype == in_cand.inlier_correspondences.dtype
|
|
# Same object reference — INV-5 forbids silent copies on the
|
|
# passthrough path.
|
|
assert out_cand.inlier_correspondences is in_cand.inlier_correspondences
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-11: was_invoked always False on PassthroughRefiner.
|
|
|
|
|
|
def test_ac11_was_invoked_always_false_on_passthrough() -> None:
|
|
refiner = PassthroughRefiner(
|
|
ransac_filter=_FakeRansacFilter(),
|
|
inference_runtime=_FakeInferenceRuntime(),
|
|
)
|
|
assert refiner.was_invoked() is False
|
|
mr = _make_result()
|
|
for _ in range(10):
|
|
refiner.refine_if_needed(_FakeFrame(), mr, residual_threshold_px=2.5)
|
|
assert refiner.was_invoked() is False
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-12: Threshold validation in refine_if_needed.
|
|
|
|
|
|
@pytest.mark.parametrize("bad_threshold", [0.0, -0.1, -50.0])
|
|
def test_ac12_threshold_validation_in_method(bad_threshold: float) -> None:
|
|
refiner = PassthroughRefiner(
|
|
ransac_filter=_FakeRansacFilter(),
|
|
inference_runtime=_FakeInferenceRuntime(),
|
|
)
|
|
mr = _make_result()
|
|
with pytest.raises(ValueError):
|
|
refiner.refine_if_needed(
|
|
_FakeFrame(), mr, residual_threshold_px=bad_threshold
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-13: Error hierarchy.
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"exc_factory",
|
|
[RefinerBackboneError, RefinerConfigError],
|
|
)
|
|
def test_ac13_all_refiner_errors_caught_as_family(exc_factory) -> None:
|
|
with pytest.raises(RefinerError):
|
|
raise exc_factory("boom")
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-14: module-layout.md symbol rename.
|
|
|
|
|
|
def test_ac14_module_layout_references_conditional_refiner() -> None:
|
|
from pathlib import Path
|
|
|
|
doc = Path("_docs/02_document/module-layout.md").read_text()
|
|
assert "ConditionalRefiner" in doc, (
|
|
"module-layout.md must reference the canonical Public API symbol"
|
|
)
|
|
# Old symbol name must NOT be present (per AC-14).
|
|
assert "AdHoPRefinementStrategy" not in doc, (
|
|
"module-layout.md still references the legacy AdHoPRefinementStrategy "
|
|
"symbol; rename per AZ-348 AC-14"
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# NFRs.
|
|
|
|
|
|
def test_nfr_perf_factory_under_20ms_p99(caplog) -> None:
|
|
config = _config_with_strategy("passthrough")
|
|
ransac_filter = _FakeRansacFilter()
|
|
inference_runtime = _FakeInferenceRuntime()
|
|
durations_ms: list[float] = []
|
|
for _ in range(100):
|
|
t0 = time.perf_counter()
|
|
build_refiner_strategy(
|
|
config,
|
|
ransac_filter=ransac_filter,
|
|
inference_runtime=inference_runtime,
|
|
)
|
|
durations_ms.append((time.perf_counter() - t0) * 1000.0)
|
|
durations_ms.sort()
|
|
p99 = durations_ms[int(0.99 * len(durations_ms))]
|
|
assert p99 <= 20.0, f"factory p99={p99:.2f} ms exceeded 20 ms budget"
|
|
|
|
|
|
def test_nfr_perf_passthrough_under_0_5ms_p99() -> None:
|
|
refiner = PassthroughRefiner(
|
|
ransac_filter=_FakeRansacFilter(),
|
|
inference_runtime=_FakeInferenceRuntime(),
|
|
)
|
|
mr = _make_result()
|
|
frame = _FakeFrame()
|
|
durations_us: list[float] = []
|
|
for _ in range(10_000):
|
|
t0 = time.perf_counter()
|
|
refiner.refine_if_needed(frame, mr, residual_threshold_px=2.5)
|
|
durations_us.append((time.perf_counter() - t0) * 1_000_000.0)
|
|
durations_us.sort()
|
|
p99_us = durations_us[int(0.99 * len(durations_us))]
|
|
assert p99_us <= 500.0, f"refine p99={p99_us:.2f} us exceeded 500 us budget"
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Surface coverage — config defaults.
|
|
|
|
|
|
def test_c3_5_config_defaults() -> None:
|
|
cfg = C3_5RefinerConfig()
|
|
assert cfg.strategy == "adhop"
|
|
assert cfg.residual_threshold_px == 2.5
|
|
assert cfg.invocation_rate_warn_threshold == 0.25
|
|
|
|
|
|
def test_c3_5_config_invocation_rate_validation() -> None:
|
|
with pytest.raises(ConfigError):
|
|
C3_5RefinerConfig(invocation_rate_warn_threshold=0.0)
|
|
with pytest.raises(ConfigError):
|
|
C3_5RefinerConfig(invocation_rate_warn_threshold=1.0)
|
|
with pytest.raises(ConfigError):
|
|
C3_5RefinerConfig(invocation_rate_warn_threshold=-0.5)
|