[AZ-344] C3 CrossDomainMatcher Protocol + factory + RollingHealthWindow

Defines the public `CrossDomainMatcher` Protocol (PEP 544
@runtime_checkable, two methods: `match` + `health_snapshot`),
the three frozen+slotted DTOs (`CandidateMatchSet`, `MatchResult`,
`MatcherHealth`) in the L1 `_types/matcher.py` layer, the
`MatcherError` family (`MatcherBackboneError`,
`InsufficientInliersError`), and the composition-root
`build_matcher_strategy` factory with lazy-import +
`BUILD_MATCHER_<variant>` gating per ADR-002.

`RollingHealthWindow` accumulator (60 s, amortised O(1) update,
strict O(1) snapshot) is constructed by the factory and injected
into every concrete matcher so all backbones share window
semantics; this is what backs C5's spoof-promotion gate.

Legacy placeholder `MatchResult` removed from `_types/matching.py`;
import-only consumers (`c4_pose.interface`, `c3_5_adhop.interface`)
repointed at the new `_types/matcher.py` home — zero behavioural
change to those components.

AC-9 (single-thread binding) and AC-10 (LightGlueRuntime
identity-share with C2.5) deferred to AZ-270 runtime-root
composition, mirroring the AZ-342 Risk-4 escape clause. All other
ACs + NFRs covered by 70 new conformance tests.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-12 05:43:33 +03:00
parent d6756f1855
commit 89c223882b
16 changed files with 1404 additions and 50 deletions
+98
View File
@@ -0,0 +1,98 @@
"""C3 cross-domain matcher DTOs (L1 cross-component layer; AZ-344).
Frozen by ``contracts/c3_matcher/cross_domain_matcher_protocol.md``
v1.0.0: three slotted, immutable dataclasses
(:class:`CandidateMatchSet`, :class:`MatchResult`,
:class:`MatcherHealth`).
:class:`CandidateMatchSet.tile_id` is a plain
``tuple[int, float, float]`` of ``(zoom_level, lat, lon)`` —
identical encoding to :class:`VprCandidate.tile_id` /
:class:`RerankCandidate.tile_id` — keeping the L1 layer free of an
L1→L3 import per ``module-layout.md`` (consumers reconstruct
:class:`gps_denied_onboard.components.c6_tile_cache.TileId` at the
C6 boundary).
:class:`MatchResult.per_candidate` is a ``tuple`` (not a ``list``)
so the ``frozen=True, slots=True`` invariant truly holds — a frozen
dataclass holding a mutable list lets consumers mutate it; the
tuple closes that door.
"""
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
__all__ = ["CandidateMatchSet", "MatchResult", "MatcherHealth"]
@dataclass(frozen=True, slots=True)
class CandidateMatchSet:
"""Per-candidate matching outcome inside a :class:`MatchResult`.
``inlier_correspondences`` is shape ``(I, 4)`` ``float32`` with
columns ``(px_query, py_query, px_tile, py_tile)``; rows are
RANSAC inliers only so ``I == inlier_count``.
``per_candidate_residual_px`` is the MEDIAN reprojection
residual on inliers — not the mean, not the max. C3.5's
threshold gate compares against this value (INV-8).
"""
tile_id: tuple[int, float, float]
inlier_count: int
inlier_correspondences: np.ndarray
ransac_outlier_count: int
per_candidate_residual_px: float
@dataclass(frozen=True, slots=True)
class MatchResult:
"""Cross-domain match outcome for one frame.
Consumed by C3.5 :class:`ConditionalRefiner` (AZ-348). The
``per_candidate`` tuple is sorted descending by
``inlier_count`` with ties broken ascending by
``per_candidate_residual_px`` (INV-3) so
``best_candidate_idx == 0`` by construction.
``reprojection_residual_px`` is the best candidate's median
residual (mirrors ``per_candidate[0].per_candidate_residual_px``)
surfaced separately so consumers do not have to know the
ranking encoding.
"""
frame_id: int
per_candidate: tuple[CandidateMatchSet, ...]
best_candidate_idx: int
reprojection_residual_px: float
matched_at: int
matcher_label: str
candidates_input: int
candidates_dropped: int
@dataclass(frozen=True, slots=True)
class MatcherHealth:
"""Rolling-window matcher health snapshot.
Produced by :meth:`CrossDomainMatcher.health_snapshot`. Drives
C5's spoof-promotion gate (AC-NEW-2 / AC-NEW-7) and post-flight
forensics.
``consecutive_low_inlier`` is the count of CONSECUTIVE recent
frames whose best-candidate inlier count fell below the
configured ``min_inliers_threshold`` floor; it resets to zero on
any frame whose inlier count meets or exceeds the floor
(INV-12).
``mean_inliers_60s`` is the rolling 60 s mean of best-candidate
inlier counts. ``backbone_error_count_60s`` counts per-candidate
:class:`MatcherBackboneError` occurrences in the same window.
"""
consecutive_low_inlier: int
mean_inliers_60s: float
backbone_error_count_60s: int
+9 -14
View File
@@ -1,25 +1,20 @@
"""C3 / shared cross-domain matching DTOs."""
"""Shared LightGlue-runtime DTOs (L1 helper layer; AZ-278).
The cross-component ``MatchResult`` DTO previously lived here under
a placeholder schema; AZ-344 froze the real shape in
:mod:`gps_denied_onboard._types.matcher`. The two surviving classes
(:class:`KeypointSet`, :class:`CorrespondenceSet`) are the L1
helper substrate for :class:`gps_denied_onboard.helpers.lightglue_runtime.LightGlueRuntime`
and stay here.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
import numpy as np
@dataclass(frozen=True)
class MatchResult:
"""Output of the cross-domain matcher (frame ↔ satellite tile)."""
query_frame_id: int
tile_id: str
keypoints_query: Any
keypoints_tile: Any
matches: Any
inlier_mask: Any | None = None
@dataclass(frozen=True)
class KeypointSet:
"""A backbone-extracted keypoint + descriptor bundle.
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Protocol
from gps_denied_onboard._types.matching import MatchResult
from gps_denied_onboard._types.matcher import MatchResult
class AdHoPRefinementStrategy(Protocol):
@@ -1,6 +1,51 @@
"""C3 Cross-Domain Matcher component — Public API."""
"""C3 cross-domain matcher — Public API (AZ-344).
from gps_denied_onboard._types.matching import MatchResult
Per ``cross_domain_matcher_protocol.md`` v1.0.0 the public surface
consists of:
- :class:`CrossDomainMatcher` Protocol (two methods).
- DTOs re-exported from :mod:`gps_denied_onboard._types.matcher`
(the L1 home for cross-component DTOs):
:class:`MatchResult`, :class:`MatcherHealth`. The internal
per-candidate sub-DTO :class:`CandidateMatchSet` is intentionally
re-exported too because C3.5 reads
``MatchResult.per_candidate[i]`` directly.
- Error family rooted at :class:`MatcherError`; two documented
subtypes (:class:`MatcherBackboneError`,
:class:`InsufficientInliersError`).
- Config block :class:`C3MatcherConfig` (registered on import).
Internals — :class:`RollingHealthWindow` and the concrete strategy
modules (``disk_lightglue``, ``aliked_lightglue``, ``xfeat``) —
are intentionally NOT re-exported: consumers see only the
Protocol; concrete strategies are imported lazily by
:mod:`gps_denied_onboard.runtime_root.matcher_factory` (Risk-2
mitigation).
"""
from gps_denied_onboard._types.matcher import (
CandidateMatchSet,
MatchResult,
MatcherHealth,
)
from gps_denied_onboard.components.c3_matcher.config import C3MatcherConfig
from gps_denied_onboard.components.c3_matcher.errors import (
InsufficientInliersError,
MatcherBackboneError,
MatcherError,
)
from gps_denied_onboard.components.c3_matcher.interface import CrossDomainMatcher
from gps_denied_onboard.config.schema import register_component_block
__all__ = ["CrossDomainMatcher", "MatchResult"]
register_component_block("c3_matcher", C3MatcherConfig)
__all__ = [
"C3MatcherConfig",
"CandidateMatchSet",
"CrossDomainMatcher",
"InsufficientInliersError",
"MatchResult",
"MatcherBackboneError",
"MatcherError",
"MatcherHealth",
]
@@ -0,0 +1,122 @@
"""C3 rolling matcher-health accumulator (AZ-344).
:class:`RollingHealthWindow` maintains the three accumulators that
back :class:`MatcherHealth` snapshots: ``consecutive_low_inlier``,
``mean_inliers_60s``, ``backbone_error_count_60s``. The 60 s window
is configurable for tests via the constructor.
The structure is intentionally **single-thread** — no locks. The
composition root binds every :class:`CrossDomainMatcher` to one
ingest thread (AC-9) so adding a lock here would only mask binding
bugs.
Data structure choice: a ``collections.deque`` of
``(timestamp_ns, inlier_count, had_backbone_error)`` tuples plus
two running sums (``_inlier_sum``, ``_error_sum``). Every
:meth:`update` call evicts expired entries from the left while
maintaining the sums — amortised O(1). :meth:`snapshot` is strict
O(1): it reads the sums and the current ``len(self._window)``
without touching the deque body, so the NFR-perf-window microbench
(p99 ≤ 50 µs) holds even with 6000 entries (100 Hz × 60 s).
"""
from __future__ import annotations
from collections import deque
from typing import Final
from gps_denied_onboard._types.matcher import MatcherHealth
__all__ = ["RollingHealthWindow"]
_DEFAULT_WINDOW_NS: Final[int] = 60 * 1_000_000_000
class RollingHealthWindow:
"""Sliding 60 s window over best-candidate inlier counts.
Constructor-injected into every concrete :class:`CrossDomainMatcher`
so all strategies share semantics (the alternative — every
matcher reimplements the window — drifts between backbones and
breaks C5's spoof-promotion gate consistency).
"""
def __init__(
self,
*,
min_inliers_threshold: int,
window_ns: int = _DEFAULT_WINDOW_NS,
) -> None:
if min_inliers_threshold < 1:
raise ValueError(
"min_inliers_threshold must be >= 1; "
f"got {min_inliers_threshold}"
)
if window_ns < 1:
raise ValueError(f"window_ns must be >= 1; got {window_ns}")
self._min_inliers_threshold: int = min_inliers_threshold
self._window_ns: int = window_ns
# Each entry: (timestamp_ns, inlier_count, had_backbone_error)
self._window: deque[tuple[int, int, bool]] = deque()
self._inlier_sum: int = 0
self._error_sum: int = 0
self._consecutive_low_inlier: int = 0
@property
def window_ns(self) -> int:
return self._window_ns
def update(
self,
*,
timestamp_ns: int,
best_inlier_count: int,
had_backbone_error: bool,
) -> None:
"""Record one frame's outcome and evict any expired entries.
``best_inlier_count`` is the BEST candidate's inlier count
for the frame (zero if every candidate was dropped /
below-threshold). ``had_backbone_error`` is ``True`` if at
least one per-candidate
:class:`MatcherBackboneError` fired in this frame.
AC-12: ``consecutive_low_inlier`` increments when
``best_inlier_count < min_inliers_threshold``; resets to
zero on any frame whose count meets or exceeds the floor.
"""
if best_inlier_count < 0:
raise ValueError(
f"best_inlier_count must be >= 0; got {best_inlier_count}"
)
cutoff = timestamp_ns - self._window_ns
window = self._window
while window and window[0][0] <= cutoff:
_ts, expired_inliers, expired_error = window.popleft()
self._inlier_sum -= expired_inliers
if expired_error:
self._error_sum -= 1
window.append((timestamp_ns, best_inlier_count, had_backbone_error))
self._inlier_sum += best_inlier_count
if had_backbone_error:
self._error_sum += 1
if best_inlier_count < self._min_inliers_threshold:
self._consecutive_low_inlier += 1
else:
self._consecutive_low_inlier = 0
def snapshot(self) -> MatcherHealth:
"""Return the current :class:`MatcherHealth` snapshot.
O(1): reads the running sums + ``len(self._window)`` only.
Empty window → ``mean_inliers_60s == 0.0`` (consumers
treat zero as "insufficient data" rather than "zero matches").
"""
count = len(self._window)
mean = (self._inlier_sum / count) if count else 0.0
return MatcherHealth(
consecutive_low_inlier=self._consecutive_low_inlier,
mean_inliers_60s=mean,
backbone_error_count_60s=self._error_sum,
)
@@ -0,0 +1,67 @@
"""C3 ``CrossDomainMatcher`` config block (AZ-344).
Registered into ``config.components['c3_matcher']`` by the package
``__init__.py``. The composition-root factory
:func:`gps_denied_onboard.runtime_root.matcher_factory.build_matcher_strategy`
reads this block to select the strategy and configure thresholds.
``strategy`` selects one of the three concrete backbones
(``disk_lightglue``, ``aliked_lightglue``, ``xfeat``); the
composition-root factory respects compile-time
``BUILD_MATCHER_<variant>`` gating on top of this label.
``min_inliers_threshold`` is the per-candidate floor: candidates
whose RANSAC inlier count falls below this value are treated as
failed (drop-and-continue) and counted into
``MatcherHealth.consecutive_low_inlier``. Default 60 — leaves
headroom below the AC-1.1 floor (p5 ≥ 80) so calibration drift
does not immediately trip the spoof gate; FT-P-19 telemetry will
tune it.
``residual_warn_threshold_px`` is the median reprojection-residual
limit (pixels) above which the matcher emits a WARN log; default
2.5 px (the AC-1.2 floor).
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Final
from gps_denied_onboard.config.schema import ConfigError
__all__ = [
"C3MatcherConfig",
"KNOWN_STRATEGIES",
]
KNOWN_STRATEGIES: Final[frozenset[str]] = frozenset(
{"disk_lightglue", "aliked_lightglue", "xfeat"}
)
@dataclass(frozen=True)
class C3MatcherConfig:
"""Per-component config for C3 cross-domain matcher."""
strategy: str = "disk_lightglue"
min_inliers_threshold: int = 60
residual_warn_threshold_px: float = 2.5
def __post_init__(self) -> None:
if self.strategy not in KNOWN_STRATEGIES:
raise ConfigError(
f"C3MatcherConfig.strategy={self.strategy!r} not in "
f"{sorted(KNOWN_STRATEGIES)}"
)
if self.min_inliers_threshold < 1:
raise ConfigError(
"C3MatcherConfig.min_inliers_threshold must be >= 1; "
f"got {self.min_inliers_threshold}"
)
if self.residual_warn_threshold_px <= 0.0:
raise ConfigError(
"C3MatcherConfig.residual_warn_threshold_px must be > 0; "
f"got {self.residual_warn_threshold_px}"
)
@@ -0,0 +1,55 @@
"""C3 ``CrossDomainMatcher`` error taxonomy (AZ-344).
The family is intentionally narrow: a per-candidate failure is the
normal case (drop-and-continue, INV-4) and is signalled via
``MatchResult.candidates_dropped`` — NOT via an exception. An
exception escapes :meth:`CrossDomainMatcher.match` only when EVERY
candidate fails OR every candidate's inlier count falls below
``config.matcher.min_inliers_threshold``; both surface as
:class:`InsufficientInliersError` which is the C5 → VIO-only
fallback trigger per AC-3.5.
:class:`MatcherBackboneError` is raised INSIDE the per-candidate
loop, caught by the strategy, logged ERROR, FDR-stamped, and the
candidate is dropped. It is exposed publicly so the per-candidate
log + FDR taxonomy is observable and so future matchers using a
different backbone can re-raise the same kind.
"""
from __future__ import annotations
__all__ = [
"InsufficientInliersError",
"MatcherBackboneError",
"MatcherError",
]
class MatcherError(Exception):
"""Base class for the C3 matcher error family.
Caught at the runtime root only when
:class:`InsufficientInliersError` fires; per-candidate failures
stay inside the strategy.
"""
class MatcherBackboneError(MatcherError):
"""Per-candidate backbone forward-pass failure.
CUDA OOM, TRT engine deserialize mismatch. Logged at ERROR; one
FDR record per occurrence; the offending candidate is dropped
from the match set; the surrounding :meth:`match` call continues
with the remaining candidates (INV-4).
"""
class InsufficientInliersError(MatcherError):
"""Zero survivors after the per-candidate loop, OR every
candidate's inlier count is below
``config.matcher.min_inliers_threshold``.
Logged at ERROR; FDR record ``kind=matcher.insufficient_inliers``
or ``kind=matcher.all_failed`` (per the trigger). C5 falls back
to VIO-only with provenance ``visual_propagated`` (AC-3.5).
"""
@@ -1,19 +1,120 @@
"""C3 `CrossDomainMatcher` Protocol.
"""C3 ``CrossDomainMatcher`` Protocol (AZ-344).
Concrete impls: DISK+LightGlue (primary), ALIKED+LightGlue, XFeat. See
`_docs/02_document/components/04_c3_matcher/`.
PEP 544 ``typing.Protocol`` with ``runtime_checkable=True``; a
two-method surface: :meth:`match` and :meth:`health_snapshot`.
Concrete impls — DISK+LightGlue (AZ-345), ALIKED+LightGlue
(AZ-346), XFeat (AZ-347) — live in sibling modules and are imported
lazily by
:mod:`gps_denied_onboard.runtime_root.matcher_factory`.
The contract at
``_docs/02_document/contracts/c3_matcher/cross_domain_matcher_protocol.md``
v1.0.0 is the authoritative shape; this module mirrors it 1:1.
"""
from __future__ import annotations
from typing import Protocol
from typing import TYPE_CHECKING, Protocol, runtime_checkable
from gps_denied_onboard._types.matching import MatchResult
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard._types.tile import Tile
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.matcher import MatchResult, MatcherHealth
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard._types.rerank import RerankResult
__all__ = ["CrossDomainMatcher"]
@runtime_checkable
class CrossDomainMatcher(Protocol):
"""Match a nav-camera frame against a satellite tile."""
"""Cross-domain (nav-camera ↔ satellite-imagery) matcher strategy.
def match(self, frame: NavCameraFrame, tile: Tile) -> MatchResult: ...
Stateless per-frame; the only persistent state is the
constructor-injected backbone runtime handles
(``InferenceRuntime`` for feature extraction,
:class:`gps_denied_onboard.helpers.lightglue_runtime.LightGlueRuntime`
for matching,
:class:`gps_denied_onboard.helpers.ransac_filter.RansacFilter`
for inlier filtering) and the rolling
:class:`gps_denied_onboard.components.c3_matcher._health_window.RollingHealthWindow`.
Invariants (see ``cross_domain_matcher_protocol.md`` v1.0.0):
- **INV-1 single-threaded** — each instance is bound to one
ingest thread by the composition root (same thread as C2.5
because they share the ``LightGlueRuntime``).
- **INV-2 stateless per-frame for `match`** — except for the
rolling health window, no implicit dependency on prior
frames; reordering ``match`` calls (tests only) MUST yield
identical ``MatchResult`` content.
- **INV-3 best-candidate selection is deterministic** —
``MatchResult.best_candidate_idx == 0`` and
``per_candidate`` is sorted by ``inlier_count`` descending
with ties broken by ``per_candidate_residual_px`` ascending
(lower residual wins).
- **INV-4 drop-and-continue per candidate** — per-candidate
exceptions never propagate out of ``match`` unless every
candidate fails. Mirrors C2.5 INV-8.
- **INV-5 ``per_candidate`` length is bounded** —
``0 < len <= len(rerank_result.candidates)``; zero raises
:class:`InsufficientInliersError`; never exceeds the input N.
- **INV-6 ``matcher_label`` is non-empty** — every
``MatchResult`` carries the strategy's name
(e.g., ``"disk_lightglue"``) for FDR provenance; MUST match
``BUILD_MATCHER_<variant>`` lowercase.
- **INV-7 ``inlier_correspondences`` shape contract** —
``ndarray[I, 4, dtype=float32]``, columns
``(px_query, py_query, px_tile, py_tile)``; rows are RANSAC
inliers only; ``I == inlier_count``.
- **INV-8 ``reprojection_residual_px`` is the BEST candidate's
median residual** — not the mean, not the max; downstream
C3.5's threshold gate compares against this value.
- **INV-9 ``health_snapshot()`` is cheap** — O(1); reads the
rolling window's pre-computed accumulators. Never recomputes
over the window contents.
Error envelope: only :class:`InsufficientInliersError` escapes
:meth:`match`; per-candidate
:class:`MatcherBackboneError` and C6 ``TileFetchError``
instances are caught inside the loop and turned into dropped
candidates + ERROR logs + per-occurrence FDR records.
"""
def match(
self,
frame: "NavCameraFrame",
rerank_result: "RerankResult",
calibration: "CameraCalibration",
) -> "MatchResult":
"""Match a frame against every candidate in ``rerank_result``.
For each candidate:
1. Extract features from the nav frame and from the tile
pixels (via the constructor-injected
:class:`InferenceRuntime` handle).
2. Run LightGlue forward via the shared
:class:`LightGlueRuntime`.
3. RANSAC-filter correspondences via the shared
:class:`RansacFilter`; record inliers + median residual.
Sort survivors by ``(inlier_count desc, residual asc)`` and
return as :class:`MatchResult`. Drop-and-continue
semantics apply per INV-4.
Raises:
InsufficientInliersError: zero survivors after the
per-candidate loop, OR every candidate's
``inlier_count`` is below
``config.matcher.min_inliers_threshold``.
"""
...
def health_snapshot(self) -> "MatcherHealth":
"""Return a rolling-window matcher health snapshot.
O(1) per INV-9. Drives C5's spoof-promotion gate
(AC-NEW-2 / AC-NEW-7) and post-flight forensics.
"""
...
@@ -26,7 +26,7 @@ from typing import TYPE_CHECKING, Protocol, runtime_checkable
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.matching import MatchResult
from gps_denied_onboard._types.matcher import MatchResult
from gps_denied_onboard._types.pose import CovarianceMode, PoseEstimate
from gps_denied_onboard._types.thermal import ThermalState
@@ -0,0 +1,196 @@
"""C3 matcher strategy composition-root factory (AZ-344).
:func:`build_matcher_strategy` selects exactly one strategy by
``config.components['c3_matcher'].strategy`` and respects
compile-time ``BUILD_MATCHER_<variant>`` gating: requesting a
strategy whose flag is OFF raises
:class:`StrategyNotAvailableError` at composition time (NOT at
first frame).
Concrete strategy modules
(``disk_lightglue``, ``aliked_lightglue``, ``xfeat``) are imported
lazily — a Tier-0 workstation build with
``BUILD_MATCHER_DISK_LIGHTGLUE=OFF`` MUST NOT load
``c3_matcher.disk_lightglue`` (ADR-002 / I-5; verifiable via
``sys.modules``).
The shared :class:`LightGlueRuntime` and :class:`RansacFilter` are
constructor-injected — the factory does NOT own their lifecycles.
The runtime root constructs ONE ``LightGlueRuntime`` instance and
passes the SAME reference to both this factory (C3) and the C2.5
``ReRankStrategy`` factory (per AC-10 / AZ-342 AC-10). The
identity-share gives R14 fix substance: a regression that
constructs two runtimes would double GPU memory.
The :class:`RollingHealthWindow` accumulator is constructed BY
this factory (one per matcher instance) and passed to the
concrete strategy's ``create`` entry-point so all backbones share
window semantics (AZ-344 Outcome line 5).
"""
from __future__ import annotations
import logging
import os
from typing import TYPE_CHECKING
from gps_denied_onboard.components.c3_matcher._health_window import RollingHealthWindow
from gps_denied_onboard.runtime_root.errors import StrategyNotAvailableError
if TYPE_CHECKING:
from gps_denied_onboard.components.c3_matcher import (
C3MatcherConfig,
CrossDomainMatcher,
)
from gps_denied_onboard.components.c7_inference import InferenceRuntime
from gps_denied_onboard.config.schema import Config
from gps_denied_onboard.helpers.lightglue_runtime import LightGlueRuntime
from gps_denied_onboard.helpers.ransac_filter import RansacFilter
__all__ = ["build_matcher_strategy"]
_LOG = logging.getLogger("gps_denied_onboard.c3_matcher")
# Strategy resolution table — mirrors the contract's
# ``cross_domain_matcher_protocol.md`` v1.0.0 § Composition-Root
# Factory table verbatim. ANY mutation of this dict MUST be
# mirrored in the contract.
_STRATEGY_TO_BUILD_FLAG: dict[str, str] = {
"disk_lightglue": "BUILD_MATCHER_DISK_LIGHTGLUE",
"aliked_lightglue": "BUILD_MATCHER_ALIKED_LIGHTGLUE",
"xfeat": "BUILD_MATCHER_XFEAT",
}
_STRATEGY_TO_MODULE: dict[str, tuple[str, str]] = {
"disk_lightglue": (
"gps_denied_onboard.components.c3_matcher.disk_lightglue",
"DiskLightGlueMatcher",
),
"aliked_lightglue": (
"gps_denied_onboard.components.c3_matcher.aliked_lightglue",
"AlikedLightGlueMatcher",
),
"xfeat": (
"gps_denied_onboard.components.c3_matcher.xfeat",
"XFeatMatcher",
),
}
def _is_build_flag_on(flag_name: str) -> bool:
"""Read a compile-time ``BUILD_*`` flag from the environment.
``ON`` / ``1`` / ``true`` / ``yes`` (case-insensitive) → ``True``;
anything else (including unset) → ``False``. Defaults to OFF so
test environments must opt-in explicitly per strategy.
"""
raw = os.environ.get(flag_name, "")
return raw.strip().lower() in {"on", "1", "true", "yes"}
def _c3_config(config: "Config") -> "C3MatcherConfig":
"""Pull the registered C3 config block.
``c3_matcher.__init__`` registers it on import; a missing
registration is a developer error and surfaces as ``KeyError``
rather than a silent fallback.
"""
return config.components["c3_matcher"]
def build_matcher_strategy(
config: "Config",
*,
lightglue_runtime: "LightGlueRuntime",
ransac_filter: "RansacFilter",
inference_runtime: "InferenceRuntime",
) -> "CrossDomainMatcher":
"""Construct the :class:`CrossDomainMatcher` impl selected by config.
1. Reads ``config.components['c3_matcher'].strategy``.
2. Checks the matching ``BUILD_MATCHER_<variant>`` flag — if
OFF, raises :class:`StrategyNotAvailableError` BEFORE any
import.
3. Constructs a :class:`RollingHealthWindow` seeded with
``config.components['c3_matcher'].min_inliers_threshold``.
4. Lazily imports the concrete strategy module.
5. Constructs the strategy via its module-level ``create(
config, lightglue_runtime, ransac_filter, inference_runtime,
health_window)`` factory function (each concrete strategy
module exports ``create`` as its public entry-point;
concrete constructors stay private).
6. Emits ONE INFO log ``kind="c3.matcher.strategy_loaded"``
with structured fields ``{strategy, min_inliers_threshold,
residual_warn_threshold_px}``.
Raises:
StrategyNotAvailableError: compile-time flag OFF or
concrete module not yet built
(AZ-345 / AZ-346 / AZ-347 pending).
"""
block = _c3_config(config)
strategy = block.strategy
flag_name = _STRATEGY_TO_BUILD_FLAG.get(strategy)
module_info = _STRATEGY_TO_MODULE.get(strategy)
if flag_name is None or module_info is None:
# Defensive — config validation rejects unknown strategy
# labels at load (``C3MatcherConfig.__post_init__``), so
# this branch is only reachable if the resolution table
# and the validation set drift apart.
_LOG.error(
"c3.matcher.build_flag_off",
extra={"strategy": strategy, "reason": "unknown_strategy"},
)
raise StrategyNotAvailableError(
f"CrossDomainMatcher {strategy!r} is not buildable in this binary."
)
if not _is_build_flag_on(flag_name):
_LOG.error(
"c3.matcher.build_flag_off",
extra={"strategy": strategy, "flag": flag_name},
)
raise StrategyNotAvailableError(
f"BUILD_MATCHER_{strategy.upper()} is OFF for this binary; "
f"cannot select strategy={strategy}."
)
health_window = RollingHealthWindow(
min_inliers_threshold=block.min_inliers_threshold,
)
module_name, class_name = module_info
try:
module = __import__(module_name, fromlist=[class_name])
except ModuleNotFoundError as exc:
raise StrategyNotAvailableError(
f"CrossDomainMatcher {strategy!r} is configured but its concrete "
f"impl module {module_name!r} has not been built into this binary "
"yet (AZ-345 / AZ-346 / AZ-347 pending)."
) from exc
create_fn = getattr(module, "create", None)
if create_fn is None:
strategy_cls = getattr(module, class_name)
instance = strategy_cls(
config,
lightglue_runtime=lightglue_runtime,
ransac_filter=ransac_filter,
inference_runtime=inference_runtime,
health_window=health_window,
)
else:
instance = create_fn(
config,
lightglue_runtime=lightglue_runtime,
ransac_filter=ransac_filter,
inference_runtime=inference_runtime,
health_window=health_window,
)
_LOG.info(
"c3.matcher.strategy_loaded",
extra={
"strategy": strategy,
"min_inliers_threshold": block.min_inliers_threshold,
"residual_warn_threshold_px": block.residual_warn_threshold_px,
},
)
return instance