[AZ-345] [AZ-346] [AZ-347] [AZ-349] C3 matchers + C3.5 AdHoP refiner

Implement the three concrete C3 CrossDomainMatcher strategies plus the
C3.5 production-default AdHoPRefiner.

C3 (AZ-345/346/347):
- DiskLightGlueMatcher + AlikedLightGlueMatcher share a single shared
  _pipeline.run_lightglue_pipeline orchestrator (decode -> query
  extract -> per-candidate loop -> RANSAC sort -> health update ->
  FDR emit) so the only per-backbone delta is the keypoint+descriptor
  extractor closure. ALIKED adds a create-time engine output-schema
  probe (AC-special-1).
- XFeatMatcher owns its own per-candidate loop (single forward fuses
  extraction + matching); it re-uses the shared FDR emission helpers
  to keep telemetry byte-identical across strategies. lightglue_runtime
  parameter accepted by factory but discarded (AC-special-1).
- All three consume the shared LightGlueRuntime / RansacFilter /
  RollingHealthWindow helpers; no helper forks. InferenceRuntimeCut
  consumer-side Protocol added per AZ-507.

C3.5 (AZ-349):
- AdHoPRefiner implements the <= conditional gate, runs the OrthoLoC
  AdHoP TRT engine over best-candidate correspondences, re-runs RANSAC
  on the perspective-preconditioned set, and emits an enriched
  MatchResult with refinement_label="adhop".
- Invariant 4 passthrough fall-through: any RefinerBackboneError (TRT
  failure, OOM, NaN, bad shape) is caught, logged ERROR, FDR-emitted
  with error: true, and converted to passthrough that still counts
  against the rolling invocation-rate window. MemoryError and other
  non-listed exceptions propagate by design (AC-5 closed-set
  semantics).
- Rolling 60-s invocation-rate window + rate-limited WARN log
  (configurable via ratelimited_warn_window_ns; default 60 s).

Shared changes:
- C3MatcherConfig + C3_5RefinerConfig extended with the new
  weights/threshold/window fields.
- matcher_factory + refiner_factory optionally forward clock +
  fdr_client to the strategy's create(); backward-compatible.
- fdr_client.records registers five new kinds: matcher.frame_done,
  matcher.backbone_error, matcher.insufficient_inliers,
  matcher.all_failed, refiner.frame_done.

Tests: 66 new (43 C3 parametrised + 23 AdHoP) covering 47/47 ACs;
focused suite green; full project test suite green except for one
pre-existing flaky CLI cold-start timing test unrelated to this batch.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-14 04:09:22 +03:00
parent 06f655d8fb
commit a1185d0a28
19 changed files with 4855 additions and 6 deletions
@@ -0,0 +1,509 @@
"""``AdHoPRefiner`` — C3.5 production-default ConditionalRefiner (AZ-349).
Implements the conditional gate documented in
``conditional_refiner_protocol.md`` v1.0.0:
- ``mr.reprojection_residual_px <= residual_threshold_px`` → passthrough
(return ``mr`` unchanged, ``was_invoked() == False``).
- Otherwise → invoke the OrthoLoC AdHoP TRT engine to perspective-
precondition the BEST candidate's correspondences, re-RANSAC, and
emit a new :class:`MatchResult` via :func:`dataclasses.replace`
with ``refinement_label="adhop"``.
INV-4 (passthrough fall-through): :class:`RefinerBackboneError`
raised inside the invoked path is caught, logged at ERROR, emitted
to FDR with ``error: true``, and converted to passthrough — the
input ``mr`` is returned unchanged with ``was_invoked() == True``
(the attempt still counts against the rolling invocation rate). Any
other exception type (e.g. ``MemoryError``) re-propagates by design
— Invariant 4's contract is intentionally closed-set per AC-5.
A 60 s rolling window tracks per-frame ``was_invoked`` outcomes for
the WARN log when the invocation rate exceeds
``config.refiner.invocation_rate_warn_threshold``; the warning is
rate-limited to one record per
``config.refiner.ratelimited_warn_window_ns`` (default 60 s) so a
misbehaving threshold cannot flood the log pipeline.
Layering:
- Imports only L1 (``_types``, ``helpers``, ``fdr_client``, ``clock``,
``config``) and sibling L3 modules (``interface``, ``errors``,
``config``, ``inference_runtime_cut``).
- The C7 inference runtime is accepted through the consumer-side
:class:`InferenceRuntimeCut` Protocol (AZ-507).
"""
from __future__ import annotations
import dataclasses
import logging
from collections import deque
from typing import TYPE_CHECKING, Final
import numpy as np
from gps_denied_onboard._types.inference import BuildConfig, PrecisionMode
from gps_denied_onboard._types.matcher import CandidateMatchSet, MatchResult
from gps_denied_onboard.components.c3_5_adhop.errors import (
RefinerBackboneError,
RefinerConfigError,
)
from gps_denied_onboard.components.c3_5_adhop.inference_runtime_cut import (
InferenceRuntimeCut,
)
from gps_denied_onboard.fdr_client import EnqueueResult, FdrRecord
from gps_denied_onboard.fdr_client.records import CURRENT_SCHEMA_VERSION
from gps_denied_onboard.helpers.iso_timestamps import iso_ts_from_clock
from gps_denied_onboard.helpers.ransac_filter import RansacFilterError
if TYPE_CHECKING:
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.config.schema import Config
from gps_denied_onboard.fdr_client import FdrClient
from gps_denied_onboard.helpers.ransac_filter import RansacFilter
__all__ = ["MODEL_NAME", "AdHoPRefiner", "create"]
MODEL_NAME: Final[str] = "adhop"
_REFINEMENT_LABEL_ADHOP: Final[str] = "adhop"
_REFINEMENT_LABEL_PASSTHROUGH: Final[str] = "passthrough"
_FDR_PRODUCER_ID: Final[str] = "c3_5_adhop.adhop_refiner"
_FDR_KIND_FRAME_DONE: Final[str] = "refiner.frame_done"
_LOG_KIND_READY: Final[str] = "c3_5.refiner.ready"
_LOG_KIND_BACKBONE_ERROR: Final[str] = "c3_5.refiner.backbone_error"
_LOG_KIND_INVOCATION_RATE: Final[str] = "c3_5.refiner.invocation_rate_high"
_LOG_KIND_FDR_OVERRUN: Final[str] = "c3_5.refiner.fdr_overrun"
# Rolling window for invocation-rate accounting.
_ROLLING_WINDOW_NS: Final[int] = 60 * 1_000_000_000
# AdHoP engine I/O contract: takes the raw correspondences and
# returns the perspective-preconditioned set. Real upstream contract
# is method-agnostic per OrthoLoC; the runtime ingests them as
# ``correspondences`` in / out.
_INPUT_CORRESPONDENCES: Final[str] = "correspondences"
_OUTPUT_CORRESPONDENCES: Final[str] = "correspondences"
class AdHoPRefiner:
"""OrthoLoC AdHoP refiner.
Single-threaded by contract (Invariant 1); the rolling window
and ``_was_invoked`` flag are non-thread-safe by design.
"""
def __init__(
self,
*,
inference_runtime: InferenceRuntimeCut,
engine_handle: object,
ransac_filter: type["RansacFilter"],
invocation_rate_warn_threshold: float,
ratelimited_warn_window_ns: int,
ransac_threshold_px: float,
min_inliers_threshold: int,
clock: "Clock",
fdr_client: "FdrClient | None",
logger: logging.Logger,
) -> None:
if invocation_rate_warn_threshold <= 0.0 or invocation_rate_warn_threshold >= 1.0:
raise RefinerConfigError(
"AdHoPRefiner.invocation_rate_warn_threshold must be in (0, 1); "
f"got {invocation_rate_warn_threshold}"
)
if ratelimited_warn_window_ns < 1:
raise RefinerConfigError(
"AdHoPRefiner.ratelimited_warn_window_ns must be >= 1; "
f"got {ratelimited_warn_window_ns}"
)
if ransac_threshold_px <= 0.0:
raise RefinerConfigError(
"AdHoPRefiner.ransac_threshold_px must be > 0; "
f"got {ransac_threshold_px}"
)
if min_inliers_threshold < 1:
raise RefinerConfigError(
"AdHoPRefiner.min_inliers_threshold must be >= 1; "
f"got {min_inliers_threshold}"
)
self._inference_runtime = inference_runtime
self._engine_handle = engine_handle
self._ransac_filter = ransac_filter
self._invocation_rate_warn_threshold = float(invocation_rate_warn_threshold)
self._ratelimited_warn_window_ns = int(ratelimited_warn_window_ns)
self._ransac_threshold_px = float(ransac_threshold_px)
self._min_inliers_threshold = int(min_inliers_threshold)
self._clock = clock
self._fdr_client = fdr_client
self._logger = logger
self._was_invoked: bool = False
# Window entries: (timestamp_ns, was_invoked)
self._invocation_window: deque[tuple[int, bool]] = deque()
self._last_warn_ns: int = -1
def refine_if_needed(
self,
frame: "NavCameraFrame",
mr: MatchResult,
residual_threshold_px: float,
) -> MatchResult:
if residual_threshold_px <= 0.0:
raise ValueError(
f"residual_threshold_px must be > 0; got {residual_threshold_px}"
)
start_ns = int(self._clock.monotonic_ns())
# Gate
if mr.reprojection_residual_px <= residual_threshold_px:
self._was_invoked = False
self._record_invocation(now_ns=start_ns, invoked=False)
self._emit_frame_done(
frame_id=int(mr.frame_id),
invoked=False,
refinement_label=_REFINEMENT_LABEL_PASSTHROUGH,
added_latency_ms=0.0,
pre_residual=float(mr.reprojection_residual_px),
post_residual=float(mr.reprojection_residual_px),
inlier_count_before=int(
mr.per_candidate[mr.best_candidate_idx].inlier_count
),
inlier_count_after=int(
mr.per_candidate[mr.best_candidate_idx].inlier_count
),
error=False,
)
return mr
# Invoked path
self._was_invoked = True
self._record_invocation(now_ns=start_ns, invoked=True)
self._maybe_warn_invocation_rate(
now_ns=start_ns, frame_id=int(mr.frame_id)
)
best_idx = mr.best_candidate_idx
best = mr.per_candidate[best_idx]
try:
refined = self._run_adhop(best.inlier_correspondences)
new_set = self._rerunsac(best=best, refined=refined)
except RefinerBackboneError as exc:
elapsed_ms = self._elapsed_ms(start_ns)
self._log_backbone_error(frame_id=int(mr.frame_id), error=exc)
self._emit_frame_done(
frame_id=int(mr.frame_id),
invoked=True,
refinement_label=_REFINEMENT_LABEL_PASSTHROUGH,
added_latency_ms=elapsed_ms,
pre_residual=float(mr.reprojection_residual_px),
post_residual=float(mr.reprojection_residual_px),
inlier_count_before=int(best.inlier_count),
inlier_count_after=int(best.inlier_count),
error=True,
)
return mr
elapsed_ms = self._elapsed_ms(start_ns)
# Replace ONLY the best candidate; other candidates remain
# unchanged so downstream consumers that still index into
# ``per_candidate`` see consistent residual ordering.
new_per_candidate = list(mr.per_candidate)
new_per_candidate[best_idx] = new_set
new_mr = dataclasses.replace(
mr,
per_candidate=tuple(new_per_candidate),
reprojection_residual_px=float(new_set.per_candidate_residual_px),
refinement_label=_REFINEMENT_LABEL_ADHOP,
refinement_added_latency_ms=elapsed_ms,
)
self._emit_frame_done(
frame_id=int(mr.frame_id),
invoked=True,
refinement_label=_REFINEMENT_LABEL_ADHOP,
added_latency_ms=elapsed_ms,
pre_residual=float(mr.reprojection_residual_px),
post_residual=float(new_set.per_candidate_residual_px),
inlier_count_before=int(best.inlier_count),
inlier_count_after=int(new_set.inlier_count),
error=False,
)
return new_mr
def was_invoked(self) -> bool:
return self._was_invoked
def _run_adhop(self, correspondences: np.ndarray) -> np.ndarray:
"""Run the AdHoP TRT engine over the given correspondences.
Translates any backbone failure (TRT error, OOM, NaN, shape
mismatch) into :class:`RefinerBackboneError`. Other
exception types propagate (AC-5).
"""
if correspondences.ndim != 2 or correspondences.shape[1] != 4:
raise RefinerBackboneError(
f"AdHoP input correspondences must have shape (N, 4); "
f"got {correspondences.shape}"
)
try:
outputs = self._inference_runtime.infer(
self._engine_handle,
{_INPUT_CORRESPONDENCES: correspondences.astype(np.float32, copy=False)},
)
except (RuntimeError, ValueError) as exc:
# Map runtime/value errors from the C7 runtime onto the
# public RefinerBackboneError so the passthrough
# fall-through (Invariant 4) is triggered. MemoryError /
# SystemExit / KeyboardInterrupt deliberately not caught
# — AC-5 closed-set semantics.
raise RefinerBackboneError(
f"AdHoP backbone forward raised {type(exc).__name__}: {exc}"
) from exc
if _OUTPUT_CORRESPONDENCES not in outputs:
raise RefinerBackboneError(
f"AdHoP backbone forward returned unexpected keys: "
f"{sorted(outputs.keys())!r}; expected "
f"{_OUTPUT_CORRESPONDENCES!r}"
)
refined = np.asarray(outputs[_OUTPUT_CORRESPONDENCES], dtype=np.float32)
if refined.ndim != 2 or refined.shape[1] != 4:
raise RefinerBackboneError(
f"AdHoP backbone returned shape {refined.shape}; "
"expected (M, 4)"
)
if refined.size > 0 and not np.isfinite(refined).all():
raise RefinerBackboneError(
"AdHoP backbone produced non-finite (NaN/Inf) correspondences"
)
return refined
def _rerunsac(
self, *, best: CandidateMatchSet, refined: np.ndarray
) -> CandidateMatchSet:
if refined.shape[0] < 4:
raise RefinerBackboneError(
"AdHoP-preconditioned correspondences below RANSAC minimum "
f"(got {refined.shape[0]} pairs; need ≥4)"
)
try:
ransac_result = self._ransac_filter.filter_correspondences(
refined, self._ransac_threshold_px, self._min_inliers_threshold
)
except RansacFilterError as exc:
raise RefinerBackboneError(
f"AdHoP post-RANSAC failed: {exc}"
) from exc
if ransac_result.inlier_count == 0:
raise RefinerBackboneError(
"AdHoP post-RANSAC produced zero inliers"
)
return CandidateMatchSet(
tile_id=best.tile_id,
inlier_count=int(ransac_result.inlier_count),
inlier_correspondences=np.ascontiguousarray(
ransac_result.inlier_correspondences, dtype=np.float32
),
ransac_outlier_count=int(ransac_result.outlier_count),
per_candidate_residual_px=float(ransac_result.median_residual_px),
)
def _record_invocation(self, *, now_ns: int, invoked: bool) -> None:
cutoff = now_ns - _ROLLING_WINDOW_NS
window = self._invocation_window
while window and window[0][0] <= cutoff:
window.popleft()
window.append((now_ns, invoked))
def _invocation_rate(self) -> float:
if not self._invocation_window:
return 0.0
invoked = sum(1 for _ts, was_inv in self._invocation_window if was_inv)
return invoked / len(self._invocation_window)
def _maybe_warn_invocation_rate(self, *, now_ns: int, frame_id: int) -> None:
rate = self._invocation_rate()
if rate <= self._invocation_rate_warn_threshold:
return
if (
self._last_warn_ns >= 0
and now_ns - self._last_warn_ns < self._ratelimited_warn_window_ns
):
return
self._last_warn_ns = now_ns
self._logger.warning(
_LOG_KIND_INVOCATION_RATE,
extra={
"kind": _LOG_KIND_INVOCATION_RATE,
"kv": {
"frame_id": frame_id,
"rate": float(rate),
"target_threshold": float(self._invocation_rate_warn_threshold),
},
},
)
def _elapsed_ms(self, start_ns: int) -> float:
return max(0.0, (int(self._clock.monotonic_ns()) - start_ns) / 1_000_000.0)
def _log_backbone_error(self, *, frame_id: int, error: BaseException) -> None:
self._logger.error(
_LOG_KIND_BACKBONE_ERROR,
extra={
"kind": _LOG_KIND_BACKBONE_ERROR,
"kv": {
"frame_id": frame_id,
"error_type": type(error).__name__,
"phase": "adhop_forward",
},
},
)
def _emit_frame_done(
self,
*,
frame_id: int,
invoked: bool,
refinement_label: str,
added_latency_ms: float,
pre_residual: float,
post_residual: float,
inlier_count_before: int,
inlier_count_after: int,
error: bool,
) -> None:
if self._fdr_client is None:
return
payload: dict[str, object] = {
"frame_id": int(frame_id),
"was_invoked": bool(invoked),
"refinement_label": refinement_label,
"refinement_added_latency_ms": float(added_latency_ms),
"pre_residual_px": float(pre_residual),
"post_residual_px": float(post_residual),
"inlier_count_before": int(inlier_count_before),
"inlier_count_after": int(inlier_count_after),
}
if error:
payload["error"] = True
record = FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=iso_ts_from_clock(self._clock),
producer_id=_FDR_PRODUCER_ID,
kind=_FDR_KIND_FRAME_DONE,
payload=payload,
)
try:
result = self._fdr_client.enqueue(record)
except Exception as exc:
self._logger.debug(
"c3_5.refiner.fdr_enqueue_failed",
extra={
"kind": "c3_5.refiner.fdr_enqueue_failed",
"kv": {"frame_id": frame_id, "error": repr(exc)},
},
)
return
if result == EnqueueResult.OVERRUN:
self._logger.warning(
_LOG_KIND_FDR_OVERRUN,
extra={
"kind": _LOG_KIND_FDR_OVERRUN,
"kv": {"frame_id": frame_id, "record_kind": record.kind},
},
)
def _build_adhop_build_config() -> BuildConfig:
return BuildConfig(
precision=PrecisionMode.FP16,
workspace_mb=512,
calibration_dataset=None,
optimization_profiles=(),
)
def create(
config: "Config",
*,
ransac_filter: type["RansacFilter"],
inference_runtime: InferenceRuntimeCut,
clock: "Clock | None" = None,
fdr_client: "FdrClient | None" = None,
logger: logging.Logger | None = None,
) -> "AdHoPRefiner":
"""Module-level factory consumed by :func:`build_refiner_strategy`.
Loads the AdHoP engine ONCE at composition time; subsequent
``refine_if_needed`` calls re-use the resolved handle. The
composition root supplies ``clock`` and ``fdr_client``; both are
optional so unit-tests can drive the refiner with fakes.
"""
block = config.components["c3_5_adhop"]
weights_path = block.adhop_weights_path
if weights_path is None:
raise RefinerConfigError(
"AdHoPRefiner.create: config.components['c3_5_adhop']"
".adhop_weights_path is None; the runtime root MUST populate "
"the AdHoP engine path before constructing this strategy."
)
if clock is None:
from gps_denied_onboard.clock.wall_clock import WallClock
clock = WallClock()
if logger is None:
logger = logging.getLogger("gps_denied_onboard.c3_5_adhop.adhop_refiner")
cache_entry = inference_runtime.compile_engine(
weights_path, _build_adhop_build_config()
)
entry_for_deserialize = type(cache_entry)(
engine_path=cache_entry.engine_path,
sha256_hex=cache_entry.sha256_hex,
sm=cache_entry.sm,
jp=cache_entry.jp,
trt=cache_entry.trt,
precision=cache_entry.precision,
extras={**cache_entry.extras, "model_name": MODEL_NAME},
)
engine_handle = inference_runtime.deserialize_engine(entry_for_deserialize)
# Derive min_inliers floor from the C3 block — same component
# boundary as the matcher's RANSAC. Defaults to 4 if the C3
# block is absent (defensive — tests bypassing C3 registration).
try:
c3_block = config.components["c3_matcher"]
min_inliers_threshold = int(c3_block.min_inliers_threshold)
except KeyError:
min_inliers_threshold = 4
logger.info(
_LOG_KIND_READY,
extra={
"kind": _LOG_KIND_READY,
"kv": {
"strategy": _REFINEMENT_LABEL_ADHOP,
"residual_threshold_px": float(block.residual_threshold_px),
"invocation_rate_warn_threshold": float(
block.invocation_rate_warn_threshold
),
"ransac_threshold_px": float(block.ransac_threshold_px),
},
},
)
return AdHoPRefiner(
inference_runtime=inference_runtime,
engine_handle=engine_handle,
ransac_filter=ransac_filter,
invocation_rate_warn_threshold=block.invocation_rate_warn_threshold,
ratelimited_warn_window_ns=block.ratelimited_warn_window_ns,
ransac_threshold_px=block.ransac_threshold_px,
min_inliers_threshold=min_inliers_threshold,
clock=clock,
fdr_client=fdr_client,
logger=logger,
)
@@ -20,11 +20,31 @@ R10 tunable from operator orchestrator).
``invocation_rate_warn_threshold`` is the rolling-60 s
invocation-rate ceiling above which a WARN log fires
(C3.5-IT-03 / NFT-PERF-01). Must be in ``(0, 1)``; default 0.25.
``adhop_weights_path`` points at the OrthoLoC AdHoP TRT engine
produced offline by AZ-321. The composition-root factory passes
it through to :func:`AdHoPRefiner.create`. ``None`` is allowed as
a placeholder for ``strategy="passthrough"`` (or in tests that
inject a pre-built handle); the AdHoP factory raises
:class:`RefinerConfigError` if the path is missing AND
``strategy="adhop"``.
``ransac_threshold_px`` is the RANSAC reprojection-threshold the
refiner passes to the shared :class:`RansacFilter` when
re-filtering the AdHoP-preconditioned correspondences. Default
3.0 px (matches C3 / C4); same helper instance is identity-shared
across C3 / C3.5 / C4 per ``ransac_filter.md``.
``ratelimited_warn_window_ns`` is the floor between consecutive
``c3_5.refiner.invocation_rate_high`` WARN logs. Default 60 s
(AC-8 — "one per 60 s"). Surfacing this as a knob keeps tests
deterministic without monkey-patching ``time.monotonic_ns``.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Final
from gps_denied_onboard.config.schema import ConfigError
@@ -37,6 +57,8 @@ __all__ = [
KNOWN_STRATEGIES: Final[frozenset[str]] = frozenset({"adhop", "passthrough"})
_DEFAULT_WARN_WINDOW_NS: Final[int] = 60 * 1_000_000_000
@dataclass(frozen=True)
class C3_5RefinerConfig:
@@ -45,6 +67,9 @@ class C3_5RefinerConfig:
strategy: str = "adhop"
residual_threshold_px: float = 2.5
invocation_rate_warn_threshold: float = 0.25
adhop_weights_path: Path | None = None
ransac_threshold_px: float = 3.0
ratelimited_warn_window_ns: int = _DEFAULT_WARN_WINDOW_NS
def __post_init__(self) -> None:
if self.strategy not in KNOWN_STRATEGIES:
@@ -62,3 +87,20 @@ class C3_5RefinerConfig:
"C3_5RefinerConfig.invocation_rate_warn_threshold must be in "
f"(0, 1); got {self.invocation_rate_warn_threshold}"
)
if self.adhop_weights_path is not None and not isinstance(
self.adhop_weights_path, Path
):
raise ConfigError(
"C3_5RefinerConfig.adhop_weights_path must be a pathlib.Path "
f"or None; got {type(self.adhop_weights_path).__name__}"
)
if self.ransac_threshold_px <= 0.0:
raise ConfigError(
"C3_5RefinerConfig.ransac_threshold_px must be > 0; "
f"got {self.ransac_threshold_px}"
)
if self.ratelimited_warn_window_ns < 1:
raise ConfigError(
"C3_5RefinerConfig.ratelimited_warn_window_ns must be >= 1; "
f"got {self.ratelimited_warn_window_ns}"
)
@@ -0,0 +1,65 @@
"""C3.5's structural cut of C7 ``InferenceRuntime`` (AZ-507).
:class:`AdHoPRefiner` (AZ-349) calls into C7's inference runtime to
load the OrthoLoC AdHoP TRT engine and run forward passes during
refinement. Per AZ-507, ``c3_5_adhop`` MUST NOT import
``components.c7_inference`` directly; the consumer-side cut declares
the structural Protocol surface that c3.5 actually uses, and the
composition root binds the c7 runtime as the concrete implementation.
This Protocol mirrors the subset of
:class:`gps_denied_onboard.components.c7_inference.InferenceRuntime`
that the C3.5 refiner consumes — ``compile_engine``,
``deserialize_engine``, ``infer``, ``release_engine``, and
``current_runtime_label``. The full Protocol (which adds
``thermal_state``) is wider; the cut narrows to what C3.5 needs.
Mirrors :mod:`gps_denied_onboard.components.c2_vpr.inference_runtime_cut`
and :mod:`gps_denied_onboard.components.c3_matcher.inference_runtime_cut`
1:1 — each consumer component owns its own structural cut for
single-responsibility, so a future divergence in one consumer does
not silently widen the others' contracts.
DTOs (``BuildConfig``, ``EngineHandle``, ``EngineCacheEntry``) live in
:mod:`gps_denied_onboard._types.inference` (L1) and are imported here
directly — they are L1 shared types, not cross-component imports.
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Literal, Protocol, runtime_checkable
from gps_denied_onboard._types.inference import (
BuildConfig,
EngineCacheEntry,
EngineHandle,
)
if TYPE_CHECKING:
import numpy as np
__all__ = ["InferenceRuntimeCut"]
@runtime_checkable
class InferenceRuntimeCut(Protocol):
"""Subset of C7 ``InferenceRuntime`` consumed by C3.5 refiners."""
def compile_engine(
self, model_path: Path, build_config: BuildConfig
) -> EngineCacheEntry: ...
def deserialize_engine(self, entry: EngineCacheEntry) -> EngineHandle: ...
def infer(
self,
handle: EngineHandle,
inputs: dict[str, "np.ndarray"],
) -> dict[str, "np.ndarray"]: ...
def release_engine(self, handle: EngineHandle) -> None: ...
def current_runtime_label(
self,
) -> Literal["tensorrt", "onnx_trt_ep", "pytorch_fp16"]: ...