[AZ-271] [AZ-276] [AZ-278] [AZ-282] Finish cross-cutting helpers + relax opencv pin

E-CC-HELPERS closes with the three remaining Layer-1 helpers and
E-CC-CONF closes with the env > YAML > defaults precedence test
gate. All four tickets ship with frozen public surfaces, hermetic
unit tests, and no upward (components.*) imports.

* AZ-271 — tests/unit/shared/config/test_precedence.py (5 ACs + smoke
  test + helper that names the layer in failure messages).
* AZ-282 — helpers/ransac_filter.py: static RansacFilter +
  RansacResult; cv2.setRNGSeed(0) for byte-equal determinism;
  median residual semantics pinned by contract.
* AZ-276 — helpers/imu_preintegrator.py + make_imu_preintegrator;
  GTSAM PreintegratedCombinedMeasurements; strict-monotonic ts_ns
  guard runs before any state mutation. Adjacent hygiene:
  _types/nav.py ImuSample/ImuWindow now use ts_ns:int and the
  spec-mandated ImuBias dataclass.
* AZ-278 — helpers/lightglue_runtime.py: structural R14 fix.
  LightGlueRuntime + non-blocking concurrent-access guard that
  raises rather than serialising. EngineHandle Protocol in
  _types/manifests.py + KeypointSet/CorrespondenceSet in
  _types/matching.py (Protocol surface adds approved by spec).

Dependency conflict (Finding 1, user-approved): gtsam 4.2 (PyPI) is
numpy-1.x-ABI only; opencv-python>=4.12 needs numpy>=2 at runtime.
Resolution: opencv-python pin relaxed to >=4.11.0.86,<4.12. The
D-CROSS-CVE-1 ratchet at ci/opencv_pin_gate.py is held at 4.11.0
with the original 4.12.0 floor restored once a numpy-2-compatible
gtsam wheel ships. Full replay procedure in
_docs/_process_leftovers/2026-05-11_d_cross_cve_1_opencv_pin_deferred.md.

Tests: 294 passed, 2 skipped (cmake/actionlint env-skips,
pre-existing). 43 new tests added for batch 5. Ruff check + format
clean.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-11 03:23:33 +03:00
parent ba20c2d195
commit 33486588de
24 changed files with 2096 additions and 36 deletions
@@ -16,6 +16,22 @@ from gps_denied_onboard.helpers.engine_filename_schema import (
EngineFilenameSchema,
EngineFilenameSchemaError,
)
from gps_denied_onboard.helpers.imu_preintegrator import (
CombinedImuFactor,
ImuPreintegrationError,
ImuPreintegrator,
make_imu_preintegrator,
)
from gps_denied_onboard.helpers.lightglue_runtime import (
LightGlueConcurrentAccessError,
LightGlueRuntime,
LightGlueRuntimeError,
)
from gps_denied_onboard.helpers.ransac_filter import (
RansacFilter,
RansacFilterError,
RansacResult,
)
from gps_denied_onboard.helpers.se3_utils import (
SE3,
Se3InvalidMatrixError,
@@ -46,10 +62,19 @@ __all__ = [
"SE3",
"SIDECAR_SUFFIX",
"WEB_MERCATOR_MAX_LAT_DEG",
"CombinedImuFactor",
"DescriptorNormaliser",
"DescriptorNormaliserError",
"EngineFilenameSchema",
"EngineFilenameSchemaError",
"ImuPreintegrationError",
"ImuPreintegrator",
"LightGlueConcurrentAccessError",
"LightGlueRuntime",
"LightGlueRuntimeError",
"RansacFilter",
"RansacFilterError",
"RansacResult",
"Se3InvalidMatrixError",
"Sha256Sidecar",
"Sha256SidecarError",
@@ -59,6 +84,7 @@ __all__ = [
"exp_map",
"is_valid_rotation",
"log_map",
"make_imu_preintegrator",
"matrix_to_se3",
"se3_to_matrix",
]
@@ -1,16 +1,196 @@
"""IMU preintegration helper — STUB.
"""`ImuPreintegrator` — single owner of GTSAM IMU preintegration (AZ-276 / E-CC-HELPERS).
Concrete implementation is owned by AZ-276 (E-CC-HELPERS). Contract lives at
`_docs/02_document/common-helpers/01_helper_imu_preintegrator.md`.
Implements the `imu_preintegrator` contract v1.0.0 at
`_docs/02_document/contracts/shared_helpers/imu_preintegrator.md`.
Single-threaded by design — no internal lock. The composition root
binds one instance per writer thread. Strict-monotonic ``ts_ns`` is the
hard timestamp invariant; non-monotonic samples raise
``ImuPreintegrationError`` without mutating internal state.
Bias drift remains the consumer's responsibility — call
``reset_with_bias`` when the C1/C5 estimator's bias estimate changes.
"""
from __future__ import annotations
from typing import Any
from typing import Any, Final
import gtsam
import numpy as np
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import ImuBias, ImuSample, ImuWindow
__all__ = [
"CombinedImuFactor",
"ImuPreintegrationError",
"ImuPreintegrator",
"make_imu_preintegrator",
]
# Documented defaults pulled from a Bosch BMI088-class IMU running at
# 200 Hz — used when ``CameraCalibration.metadata`` does not carry an
# explicit ``imu_noise_model`` block. The contract owner notes the FC's
# per-deployment IMU noise model lives in ``CameraCalibration``; these
# defaults are only for bring-up + unit tests.
_DEFAULT_ACCEL_NOISE_DENSITY: Final[float] = 1.86e-3 # m/s^2 / sqrt(Hz)
_DEFAULT_GYRO_NOISE_DENSITY: Final[float] = 1.87e-4 # rad/s / sqrt(Hz)
_DEFAULT_ACCEL_BIAS_RW: Final[float] = 4.33e-4 # m/s^3 / sqrt(Hz)
_DEFAULT_GYRO_BIAS_RW: Final[float] = 2.66e-5 # rad/s^2 / sqrt(Hz)
_DEFAULT_INTEGRATION_NOISE: Final[float] = 1e-8
_DEFAULT_GRAVITY_M_S2: Final[float] = 9.80665
# Re-export GTSAM's combined IMU factor so consumers do not import GTSAM.
CombinedImuFactor = gtsam.CombinedImuFactor
class ImuPreintegrationError(RuntimeError):
"""Raised on schema, monotonicity, or empty-window violations.
Carries the offending timestamp and the last accepted timestamp in
its message so the consumer's catch-and-log path can record it as
an FDR ``kind="imu.skew"`` event (per AZ-276 Risk 2 mitigation).
"""
def _bias_to_gtsam(bias: ImuBias) -> gtsam.imuBias.ConstantBias:
return gtsam.imuBias.ConstantBias(
np.asarray(bias.accel_bias, dtype=np.float64),
np.asarray(bias.gyro_bias, dtype=np.float64),
)
def _zero_bias() -> gtsam.imuBias.ConstantBias:
return gtsam.imuBias.ConstantBias(np.zeros(3, dtype=np.float64), np.zeros(3, dtype=np.float64))
def _read_imu_noise(metadata: dict[str, Any]) -> dict[str, float]:
"""Pull IMU noise densities from ``CameraCalibration.metadata``.
Falls back to the documented defaults when the block is absent —
every key in the noise block is optional and independently
defaulted, so partial blocks are honoured.
"""
block = metadata.get("imu_noise_model", {}) if isinstance(metadata, dict) else {}
return {
"accel_noise_density": float(
block.get("accel_noise_density", _DEFAULT_ACCEL_NOISE_DENSITY)
),
"gyro_noise_density": float(block.get("gyro_noise_density", _DEFAULT_GYRO_NOISE_DENSITY)),
"accel_bias_rw": float(block.get("accel_bias_rw", _DEFAULT_ACCEL_BIAS_RW)),
"gyro_bias_rw": float(block.get("gyro_bias_rw", _DEFAULT_GYRO_BIAS_RW)),
"integration_noise": float(block.get("integration_noise", _DEFAULT_INTEGRATION_NOISE)),
"gravity_m_s2": float(block.get("gravity_m_s2", _DEFAULT_GRAVITY_M_S2)),
}
class ImuPreintegrator:
"""Preintegrate IMU samples over a time window for VIO / state-estimator factor adds."""
"""Single owner of GTSAM `PreintegratedCombinedMeasurements`.
def preintegrate(self, samples: Any) -> Any:
raise NotImplementedError("ImuPreintegrator concrete impl is AZ-276 (E-CC-HELPERS)")
Single-threaded by contract. Strict-monotonic timestamps enforced.
"""
def __init__(self, params: gtsam.PreintegrationCombinedParams) -> None:
self._params = params
self._bias: gtsam.imuBias.ConstantBias = _zero_bias()
self._pim = gtsam.PreintegratedCombinedMeasurements(self._params, self._bias)
# ``_last_ts_ns`` is None until the first sample is integrated.
self._last_ts_ns: int | None = None
self._sample_count: int = 0
def reset_with_bias(self, bias: ImuBias) -> None:
"""Replace the active bias.
Discards the partial integration accumulator — the contract
specifies that re-bias affects "subsequent samples only", which
we honour by re-initialising the GTSAM PIM with the new bias
and a clean monotonic baseline. Consumers MUST close the prior
window via ``reset_for_new_keyframe`` before changing bias if
they want to retain its contribution.
"""
self._bias = _bias_to_gtsam(bias)
self._pim = gtsam.PreintegratedCombinedMeasurements(self._params, self._bias)
self._sample_count = 0
self._last_ts_ns = None
def integrate_sample(self, sample: ImuSample) -> None:
"""Integrate one IMU sample.
Strict-monotonic guard runs BEFORE state mutation so a rejected
sample leaves the accumulator unchanged.
"""
if self._last_ts_ns is not None and sample.ts_ns <= self._last_ts_ns:
raise ImuPreintegrationError(
f"non-monotonic IMU sample: ts_ns={sample.ts_ns} <= last_ts_ns={self._last_ts_ns}"
)
if self._last_ts_ns is None:
dt_seconds = 0.0
else:
dt_seconds = (sample.ts_ns - self._last_ts_ns) * 1e-9
accel = np.asarray(sample.accel_xyz, dtype=np.float64)
gyro = np.asarray(sample.gyro_xyz, dtype=np.float64)
# GTSAM rejects dt==0; for the first sample we still record the
# timestamp without integrating so the next sample sees a real dt.
if dt_seconds > 0.0:
try:
self._pim.integrateMeasurement(accel, gyro, dt_seconds)
except RuntimeError as exc:
raise ImuPreintegrationError(
f"GTSAM PIM rejected sample at ts_ns={sample.ts_ns}: {exc}"
) from exc
self._last_ts_ns = sample.ts_ns
self._sample_count += 1
def integrate_window(self, window: ImuWindow) -> None:
"""Integrate every sample in ``window`` in order."""
for sample in window.samples:
self.integrate_sample(sample)
def current_preintegration(self) -> gtsam.PreintegratedCombinedMeasurements:
"""Return the live PIM without resetting state.
Raises ``ImuPreintegrationError`` if no integration has run
since the last reset (per AC-3).
"""
if self._sample_count == 0:
raise ImuPreintegrationError("no samples since reset: cannot return preintegration")
return self._pim
def reset_for_new_keyframe(self) -> gtsam.PreintegratedCombinedMeasurements:
"""Return the closed PIM and clear internal accumulators.
Caller MUST capture the return value — the helper does not
retain a reference past the call.
"""
if self._sample_count == 0:
raise ImuPreintegrationError("no samples since reset: cannot close keyframe")
closed = self._pim
self._pim = gtsam.PreintegratedCombinedMeasurements(self._params, self._bias)
self._sample_count = 0
self._last_ts_ns = None
return closed
def make_imu_preintegrator(calibration: CameraCalibration) -> ImuPreintegrator:
"""Construct an `ImuPreintegrator` from the per-deployment calibration.
Noise densities are pulled from
``calibration.metadata["imu_noise_model"]``; missing keys fall back
to the documented BMI088-class defaults.
"""
noise = _read_imu_noise(calibration.metadata or {})
params = gtsam.PreintegrationCombinedParams.MakeSharedU(noise["gravity_m_s2"])
params.setAccelerometerCovariance(np.eye(3) * (noise["accel_noise_density"] ** 2))
params.setGyroscopeCovariance(np.eye(3) * (noise["gyro_noise_density"] ** 2))
params.setBiasAccCovariance(np.eye(3) * (noise["accel_bias_rw"] ** 2))
params.setBiasOmegaCovariance(np.eye(3) * (noise["gyro_bias_rw"] ** 2))
params.setIntegrationCovariance(np.eye(3) * noise["integration_noise"])
return ImuPreintegrator(params)
@@ -1,19 +1,133 @@
"""Shared LightGlue inference runtime — STUB.
"""`LightGlueRuntime` — shared LightGlue matcher (AZ-278 / E-CC-HELPERS / R14 fix).
R14 fix: this helper is the single owner; both C2.5 (single-pair inlier counter)
and C3 (matcher) import it. Neither component depends on the other.
Implements the `lightglue_runtime` contract v1.0.0 at
`_docs/02_document/contracts/shared_helpers/lightglue_runtime.md`.
Concrete implementation is owned by AZ-278. Contract:
`_docs/02_document/common-helpers/03_helper_lightglue_runtime.md`.
Layer 1 helper — NO `gps_denied_onboard.components.*` imports. The
engine handle is an opaque Protocol defined in `_types/manifests.py`;
C7's `InferenceRuntime.deserialize_engine` produces the concrete handle
and the composition root injects ONE shared instance into both C2.5
(InlierBasedReranker) and C3 (CrossDomainMatcher) — the structural
fix for R14.
Single-threaded by contract. The concurrent-access guard is
non-blocking: concurrent entry RAISES `LightGlueConcurrentAccessError`
rather than serialising, so a composition-root regression that wires
the runtime into multiple threads is caught immediately instead of
silently corrupting CUDA state.
"""
from __future__ import annotations
from typing import Any
import threading
from gps_denied_onboard._types.manifests import EngineHandle
from gps_denied_onboard._types.matching import CorrespondenceSet, KeypointSet
__all__ = [
"LightGlueConcurrentAccessError",
"LightGlueRuntime",
"LightGlueRuntimeError",
]
class LightGlueRuntimeError(RuntimeError):
"""Raised on construction guards or descriptor-dim mismatch."""
class LightGlueConcurrentAccessError(RuntimeError):
"""Raised when a concurrent ``match`` / ``match_batch`` entry is detected.
The serial-access invariant is a composition-root contract — if you
see this exception, the runtime was wired into more than one thread
by mistake. Fix the composition root, do NOT add a lock here.
"""
def _validate_keypoint_set(features: KeypointSet, *, name: str, expected_dim: int) -> None:
if features.descriptors.ndim != 2 or features.descriptors.shape[1] != expected_dim:
actual_dim = (
features.descriptors.shape[1] if features.descriptors.ndim == 2 else "<bad shape>"
)
raise LightGlueRuntimeError(
f"{name}: descriptor dim mismatch — engine expects {expected_dim}, "
f"got {actual_dim} (descriptors.shape={features.descriptors.shape})"
)
class LightGlueRuntime:
"""Shared LightGlue matcher runtime."""
"""Shared LightGlue inference runtime.
def match(self, descriptors_a: Any, descriptors_b: Any) -> Any:
raise NotImplementedError("LightGlueRuntime concrete impl is AZ-278")
Single-thread by contract; concurrent entry raises.
"""
def __init__(self, engine_handle: EngineHandle) -> None:
if engine_handle is None:
raise LightGlueRuntimeError(
"LightGlueRuntime requires a non-None engine_handle (got None); "
"composition root must inject the engine produced by C7's "
"InferenceRuntime.deserialize_engine"
)
try:
descriptor_dim = int(engine_handle.descriptor_dim)
except AttributeError as exc:
raise LightGlueRuntimeError(
f"engine_handle missing required Protocol attribute 'descriptor_dim': {exc}"
) from exc
if descriptor_dim < 1:
raise LightGlueRuntimeError(
f"engine_handle.descriptor_dim must be >= 1; got {descriptor_dim}"
)
self._engine = engine_handle
self._descriptor_dim = descriptor_dim
# Non-blocking guard: ``try_acquire`` raises on contention rather
# than serialising callers, per the contract's "concurrent calls
# are a bug" stance.
self._in_use = threading.Lock()
def descriptor_dim(self) -> int:
return self._descriptor_dim
def match(self, features_a: KeypointSet, features_b: KeypointSet) -> CorrespondenceSet:
"""Match a single pair (C2.5 path)."""
if not self._in_use.acquire(blocking=False):
raise LightGlueConcurrentAccessError(
"LightGlueRuntime.match called from a second thread while another "
"match is in flight — the runtime owns ONE CUDA stream and must be "
"bound to a single hot-path thread by the composition root"
)
try:
_validate_keypoint_set(features_a, name="features_a", expected_dim=self._descriptor_dim)
_validate_keypoint_set(features_b, name="features_b", expected_dim=self._descriptor_dim)
return self._engine.forward(features_a, features_b)
finally:
self._in_use.release()
def match_batch(
self,
features_a_list: list[KeypointSet],
features_b_list: list[KeypointSet],
) -> list[CorrespondenceSet]:
"""Batch-match (C3 path) — iterates serially over the single CUDA stream."""
if len(features_a_list) != len(features_b_list):
raise LightGlueRuntimeError(
f"match_batch: features_a_list (len={len(features_a_list)}) and "
f"features_b_list (len={len(features_b_list)}) must have equal length"
)
if not self._in_use.acquire(blocking=False):
raise LightGlueConcurrentAccessError(
"LightGlueRuntime.match_batch called concurrently with another match"
)
try:
results: list[CorrespondenceSet] = []
for idx, (fa, fb) in enumerate(zip(features_a_list, features_b_list, strict=True)):
_validate_keypoint_set(
fa, name=f"features_a_list[{idx}]", expected_dim=self._descriptor_dim
)
_validate_keypoint_set(
fb, name=f"features_b_list[{idx}]", expected_dim=self._descriptor_dim
)
results.append(self._engine.forward(fa, fb))
return results
finally:
self._in_use.release()
+220 -7
View File
@@ -1,14 +1,227 @@
"""Generic RANSAC inlier filter — STUB.
"""`RansacFilter` — shared 2D-2D RANSAC + median-residual helper (AZ-282 / E-CC-HELPERS).
Concrete impl owned by AZ-282. Contract:
`_docs/02_document/common-helpers/07_helper_ransac_filter.md`.
Implements the `ransac_filter` contract v1.0.0
(`_docs/02_document/contracts/shared_helpers/ransac_filter.md`).
Stateless static-only design — `coderule.mdc` permits static methods for
pure self-contained computations. Determinism is guaranteed by setting
`cv2.setRNGSeed(0)` immediately before every `cv2.findHomography(...,
RANSAC)` call.
Public surface raises ONLY `RansacFilterError`; OpenCV's lower-level
exceptions are wrapped.
"""
from __future__ import annotations
from typing import Any
from dataclasses import dataclass
from typing import Final
import cv2
import numpy as np
from gps_denied_onboard.helpers.se3_utils import SE3, se3_to_matrix
__all__ = [
"RansacFilter",
"RansacFilterError",
"RansacResult",
]
def filter_inliers(matches: Any, threshold_px: float, max_iters: int = 1000) -> Any:
"""Run RANSAC on a set of point matches and return the inlier mask."""
raise NotImplementedError("ransac_filter concrete impl is AZ-282")
# RANSAC requires ≥4 points to fit a homography (4 pairs of (x,y) ↔ (x,y)).
_HOMOGRAPHY_MIN_POINTS: Final[int] = 4
_DETERMINISTIC_SEED: Final[int] = 0
class RansacFilterError(ValueError):
"""Raised when an input violates the public shape / dtype / threshold contract."""
@dataclass(frozen=True)
class RansacResult:
"""Frozen output of `RansacFilter.filter_correspondences`.
The numpy arrays are not copied; consumers MUST treat them as
read-only. `median_residual_px` is NaN when the inlier set is empty
(matches `compute_reprojection_residual` semantics).
"""
inlier_correspondences: np.ndarray
inlier_count: int
outlier_count: int
median_residual_px: float
def _validate_correspondences(correspondences: np.ndarray, *, where: str) -> None:
if not isinstance(correspondences, np.ndarray):
raise RansacFilterError(
f"{where}: expected np.ndarray; got {type(correspondences).__name__}"
)
if correspondences.ndim != 2 or correspondences.shape[1] != 4:
raise RansacFilterError(
f"{where}: correspondences must have shape (N, 4) [x_a, y_a, x_b, y_b]; "
f"got {correspondences.shape}"
)
def _validate_threshold(ransac_threshold_px: float) -> None:
if not isinstance(ransac_threshold_px, (int, float)) or ransac_threshold_px <= 0:
raise RansacFilterError(
f"ransac_threshold_px must be a positive float; got {ransac_threshold_px!r}"
)
def _validate_min_inliers(min_inliers: int) -> None:
if not isinstance(min_inliers, int) or min_inliers < 0:
raise RansacFilterError(f"min_inliers must be a non-negative int; got {min_inliers!r}")
def _median_residual(
inliers: np.ndarray, K: np.ndarray, distortion: np.ndarray, pose_matrix: np.ndarray
) -> float:
"""Median pixel residual between (x_b, y_b) and reprojected (x_a, y_a)+pose.
Treats each correspondence's image-a pixel as the ``observed`` point and
its image-b pixel as the ``predicted`` point under the supplied pose.
The 2D-to-3D back-projection assumes z=1 in camera-a frame — sufficient
for the helper's purpose of giving consumers a deterministic, OpenCV-
backed quality signal in pixels. The contract pins MEDIAN (NOT mean).
"""
if inliers.shape[0] == 0:
return float("nan")
pts_a = inliers[:, :2].astype(np.float64, copy=False)
pts_b = inliers[:, 2:].astype(np.float64, copy=False)
# Back-project image-a pixels into 3D (z=1) in camera-a frame using K^{-1}.
K_inv = np.linalg.inv(K)
pixels_h = np.hstack([pts_a, np.ones((pts_a.shape[0], 1), dtype=np.float64)])
rays = (K_inv @ pixels_h.T).T # (N, 3)
# Apply pose (R | t) from cam_a -> cam_b: p_b = R @ p_a + t.
R = pose_matrix[:3, :3]
t = pose_matrix[:3, 3]
rvec, _ = cv2.Rodrigues(R)
projected, _ = cv2.projectPoints(
rays.reshape(-1, 1, 3), rvec=rvec, tvec=t, cameraMatrix=K, distCoeffs=distortion
)
projected_pts = projected.reshape(-1, 2)
residuals = np.linalg.norm(projected_pts - pts_b, axis=1)
return float(np.median(residuals))
class RansacFilter:
"""Shared 2D-2D RANSAC inlier filter + reprojection-residual helper.
All methods are static; no module-level state. Calls into OpenCV pin
the RANSAC seed for byte-equal determinism (AC-3).
"""
@staticmethod
def filter_correspondences(
correspondences: np.ndarray,
ransac_threshold_px: float,
min_inliers: int,
) -> RansacResult:
"""Run `cv2.findHomography(..., RANSAC)` and return the inlier mask.
``min_inliers`` is informational only — see the contract's
"Min-inliers semantics" invariant.
"""
_validate_correspondences(correspondences, where="filter_correspondences")
_validate_threshold(ransac_threshold_px)
_validate_min_inliers(min_inliers)
n_points = correspondences.shape[0]
if n_points < _HOMOGRAPHY_MIN_POINTS:
raise RansacFilterError(
f"filter_correspondences: homography RANSAC requires ≥{_HOMOGRAPHY_MIN_POINTS} "
f"correspondences; got {n_points}"
)
pts_a = correspondences[:, :2].astype(np.float64, copy=False)
pts_b = correspondences[:, 2:].astype(np.float64, copy=False)
cv2.setRNGSeed(_DETERMINISTIC_SEED)
try:
_H, mask = cv2.findHomography(
pts_a,
pts_b,
method=cv2.RANSAC,
ransacReprojThreshold=float(ransac_threshold_px),
)
except cv2.error as exc:
raise RansacFilterError(f"filter_correspondences: OpenCV RANSAC failed: {exc}") from exc
if mask is None:
inlier_mask = np.zeros(n_points, dtype=bool)
else:
inlier_mask = mask.ravel().astype(bool)
inliers = correspondences[inlier_mask]
inlier_count = int(inliers.shape[0])
outlier_count = n_points - inlier_count
if inlier_count == 0:
median_residual = float("nan")
else:
# Median residual from the homography fit itself — distance from
# H @ pts_a to pts_b for the inlier subset. Reuse cv2.perspectiveTransform
# to stay in OpenCV's reference frame; this matches the C3.5/C4 contract.
warped = cv2.perspectiveTransform(
inliers[:, :2].reshape(-1, 1, 2).astype(np.float64), _H
).reshape(-1, 2)
residuals = np.linalg.norm(warped - inliers[:, 2:].astype(np.float64), axis=1)
median_residual = float(np.median(residuals))
return RansacResult(
inlier_correspondences=inliers,
inlier_count=inlier_count,
outlier_count=outlier_count,
median_residual_px=median_residual,
)
@staticmethod
def compute_reprojection_residual(
correspondences: np.ndarray,
K: np.ndarray,
distortion: np.ndarray,
pose: SE3,
) -> float:
"""Median reprojection residual in pixels for the supplied inlier set.
Empty inlier sets return ``NaN`` per AC-5. ``K`` MUST be (3, 3);
``distortion`` MUST be (5,) or (8,) — OpenCV's standard models.
"""
_validate_correspondences(correspondences, where="compute_reprojection_residual")
if not isinstance(K, np.ndarray):
raise RansacFilterError(
f"compute_reprojection_residual: K must be np.ndarray; got {type(K).__name__}"
)
if K.shape != (3, 3):
raise RansacFilterError(
f"compute_reprojection_residual: K must have shape (3, 3); got {K.shape}"
)
if not isinstance(distortion, np.ndarray):
raise RansacFilterError(
f"compute_reprojection_residual: distortion must be np.ndarray; "
f"got {type(distortion).__name__}"
)
if distortion.ndim != 1 or distortion.shape[0] not in (5, 8):
raise RansacFilterError(
f"compute_reprojection_residual: distortion must have shape (5,) or (8,); "
f"got {distortion.shape}"
)
K_f64 = K.astype(np.float64, copy=False)
dist_f64 = distortion.astype(np.float64, copy=False)
pose_matrix = se3_to_matrix(pose)
try:
return _median_residual(correspondences, K_f64, dist_f64, pose_matrix)
except cv2.error as exc:
raise RansacFilterError(
f"compute_reprojection_residual: OpenCV projection failed: {exc}"
) from exc