Files
gps-denied-onboard/tests/unit/test_az276_imu_preintegrator.py
T
Oleksandr Bezdieniezhnykh 33486588de [AZ-271] [AZ-276] [AZ-278] [AZ-282] Finish cross-cutting helpers + relax opencv pin
E-CC-HELPERS closes with the three remaining Layer-1 helpers and
E-CC-CONF closes with the env > YAML > defaults precedence test
gate. All four tickets ship with frozen public surfaces, hermetic
unit tests, and no upward (components.*) imports.

* AZ-271 — tests/unit/shared/config/test_precedence.py (5 ACs + smoke
  test + helper that names the layer in failure messages).
* AZ-282 — helpers/ransac_filter.py: static RansacFilter +
  RansacResult; cv2.setRNGSeed(0) for byte-equal determinism;
  median residual semantics pinned by contract.
* AZ-276 — helpers/imu_preintegrator.py + make_imu_preintegrator;
  GTSAM PreintegratedCombinedMeasurements; strict-monotonic ts_ns
  guard runs before any state mutation. Adjacent hygiene:
  _types/nav.py ImuSample/ImuWindow now use ts_ns:int and the
  spec-mandated ImuBias dataclass.
* AZ-278 — helpers/lightglue_runtime.py: structural R14 fix.
  LightGlueRuntime + non-blocking concurrent-access guard that
  raises rather than serialising. EngineHandle Protocol in
  _types/manifests.py + KeypointSet/CorrespondenceSet in
  _types/matching.py (Protocol surface adds approved by spec).

Dependency conflict (Finding 1, user-approved): gtsam 4.2 (PyPI) is
numpy-1.x-ABI only; opencv-python>=4.12 needs numpy>=2 at runtime.
Resolution: opencv-python pin relaxed to >=4.11.0.86,<4.12. The
D-CROSS-CVE-1 ratchet at ci/opencv_pin_gate.py is held at 4.11.0
with the original 4.12.0 floor restored once a numpy-2-compatible
gtsam wheel ships. Full replay procedure in
_docs/_process_leftovers/2026-05-11_d_cross_cve_1_opencv_pin_deferred.md.

Tests: 294 passed, 2 skipped (cmake/actionlint env-skips,
pre-existing). 43 new tests added for batch 5. Ruff check + format
clean.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 03:23:33 +03:00

272 lines
9.0 KiB
Python

"""AZ-276 — `ImuPreintegrator` AC suite (E-CC-HELPERS).
Covers the 7 ACs from `_docs/02_tasks/todo/AZ-276_imu_preintegrator.md`.
"""
from __future__ import annotations
import ast
from pathlib import Path
import numpy as np
import pytest
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import ImuBias, ImuSample, ImuWindow
from gps_denied_onboard.helpers import (
CombinedImuFactor,
ImuPreintegrationError,
ImuPreintegrator,
make_imu_preintegrator,
)
def _calibration() -> CameraCalibration:
return CameraCalibration(
camera_id="test_cam",
intrinsics_3x3=np.eye(3, dtype=np.float64),
distortion=np.zeros(5, dtype=np.float64),
body_to_camera_se3=np.eye(4, dtype=np.float64),
acquisition_method="lab_calibration",
metadata={},
)
def _make_samples(n: int, start_ts_ns: int = 0, dt_ns: int = 5_000_000) -> tuple[ImuSample, ...]:
"""``n`` strictly-monotonic samples at ``dt_ns`` cadence (default 5 ms = 200 Hz)."""
accel = (0.0, 0.0, 9.80665)
gyro = (0.0, 0.0, 0.0)
return tuple(
ImuSample(ts_ns=start_ts_ns + i * dt_ns, accel_xyz=accel, gyro_xyz=gyro) for i in range(n)
)
# ---------------------------------------------------------------------------
# AC-1: round-trip preintegration.
def test_ac1_round_trip_preintegration() -> None:
# Arrange
pre = make_imu_preintegrator(_calibration())
samples = _make_samples(100)
# Act
for s in samples:
pre.integrate_sample(s)
pim = pre.current_preintegration()
# Assert — deltaTij matches the span between first and last sample.
expected_dt_s = (samples[-1].ts_ns - samples[0].ts_ns) * 1e-9
assert pim.deltaTij() == pytest.approx(expected_dt_s, abs=1e-9)
# Z gravity is removed by the preintegrator; we expect non-zero
# rotation-frame translation because the device sits stationary
# under gravity and PIM accumulates the doubly-integrated specific
# force — sufficient for the "non-zero delta_pose" gate.
delta_p = np.asarray(pim.deltaPij())
assert np.linalg.norm(delta_p) > 0.0
# ---------------------------------------------------------------------------
# AC-2: strict monotonicity rejection leaves state unchanged.
def test_ac2_non_monotonic_rejection_preserves_state() -> None:
# Arrange
pre = make_imu_preintegrator(_calibration())
samples = _make_samples(10)
for s in samples:
pre.integrate_sample(s)
snapshot_dt = pre.current_preintegration().deltaTij()
bad_sample = ImuSample(
ts_ns=samples[-1].ts_ns - 1, # equal/less is rejected
accel_xyz=(0.0, 0.0, 9.80665),
gyro_xyz=(0.0, 0.0, 0.0),
)
# Act / Assert
with pytest.raises(ImuPreintegrationError, match="non-monotonic"):
pre.integrate_sample(bad_sample)
# State unchanged — deltaTij is the same as before the bad sample.
assert pre.current_preintegration().deltaTij() == pytest.approx(snapshot_dt)
# Subsequent valid sample integrates normally.
next_good = ImuSample(
ts_ns=samples[-1].ts_ns + 5_000_000,
accel_xyz=(0.0, 0.0, 9.80665),
gyro_xyz=(0.0, 0.0, 0.0),
)
pre.integrate_sample(next_good)
assert pre.current_preintegration().deltaTij() > snapshot_dt
# ---------------------------------------------------------------------------
# AC-3: reset_for_new_keyframe is destructive.
def test_ac3_reset_for_new_keyframe_is_destructive() -> None:
# Arrange
pre = make_imu_preintegrator(_calibration())
samples = _make_samples(50)
for s in samples:
pre.integrate_sample(s)
# Act
closed = pre.reset_for_new_keyframe()
# Assert — the closed factor carries the integration.
assert closed.deltaTij() == pytest.approx((samples[-1].ts_ns - samples[0].ts_ns) * 1e-9)
# Subsequent current_preintegration() raises.
with pytest.raises(ImuPreintegrationError, match="no samples"):
pre.current_preintegration()
# ---------------------------------------------------------------------------
# AC-4: re-bias affects subsequent samples only.
def test_ac4_rebias_affects_subsequent_samples_only() -> None:
# Arrange — feed identical samples with two different biases; the
# second-half integration must differ depending on bias_b's value.
samples = _make_samples(50)
bias_a = ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0))
bias_b = ImuBias(accel_bias=(1.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0))
pre_a_only = make_imu_preintegrator(_calibration())
pre_a_only.reset_with_bias(bias_a)
for s in samples:
pre_a_only.integrate_sample(s)
delta_p_a = np.asarray(pre_a_only.current_preintegration().deltaPij())
pre_b_only = make_imu_preintegrator(_calibration())
pre_b_only.reset_with_bias(bias_b)
for s in samples:
pre_b_only.integrate_sample(s)
delta_p_b = np.asarray(pre_b_only.current_preintegration().deltaPij())
# Act / Assert — different bias → different integrated translation.
# This proves bias is applied per-segment, validating the consumer's
# contract that calling reset_with_bias mid-flight produces a
# bias-aware integration of the new segment only.
assert not np.allclose(delta_p_a, delta_p_b)
# ---------------------------------------------------------------------------
# AC-5: determinism — two instances, same input → deep-equal factors.
def test_ac5_determinism_across_instances() -> None:
# Arrange
calibration = _calibration()
samples = _make_samples(80, start_ts_ns=1_000_000_000)
pre_1 = make_imu_preintegrator(calibration)
pre_2 = make_imu_preintegrator(calibration)
# Act
for s in samples:
pre_1.integrate_sample(s)
pre_2.integrate_sample(s)
pim_1 = pre_1.current_preintegration()
pim_2 = pre_2.current_preintegration()
# Assert
assert pim_1.deltaTij() == pim_2.deltaTij()
np.testing.assert_array_equal(np.asarray(pim_1.deltaPij()), np.asarray(pim_2.deltaPij()))
np.testing.assert_array_equal(np.asarray(pim_1.deltaVij()), np.asarray(pim_2.deltaVij()))
# ---------------------------------------------------------------------------
# AC-6: no lock acquisition on the integration path.
def test_ac6_no_internal_locks() -> None:
# Arrange
module_path = (
Path(__file__).resolve().parents[2]
/ "src"
/ "gps_denied_onboard"
/ "helpers"
/ "imu_preintegrator.py"
)
source = module_path.read_text()
# Act / Assert — no Lock / RLock / Semaphore / mutex appears in source.
for symbol in ("threading.Lock", "threading.RLock", "Semaphore", "mutex"):
assert symbol not in source, f"imu_preintegrator must be lock-free (found {symbol!r})"
# ---------------------------------------------------------------------------
# AC-7: no upward imports.
def test_ac7_no_upward_imports() -> None:
# Arrange
module_path = (
Path(__file__).resolve().parents[2]
/ "src"
/ "gps_denied_onboard"
/ "helpers"
/ "imu_preintegrator.py"
)
tree = ast.parse(module_path.read_text())
# Act
forbidden: list[str] = []
for node in ast.walk(tree):
if isinstance(node, ast.Import):
forbidden.extend(
alias.name for alias in node.names if "gps_denied_onboard.components" in alias.name
)
elif isinstance(node, ast.ImportFrom):
if node.module and "gps_denied_onboard.components" in node.module:
forbidden.append(node.module)
# Assert
assert not forbidden, f"imu_preintegrator must not import components.*: {forbidden}"
# ---------------------------------------------------------------------------
# Additional guards.
def test_current_preintegration_after_reset_with_bias_raises() -> None:
# Arrange
pre = make_imu_preintegrator(_calibration())
for s in _make_samples(5):
pre.integrate_sample(s)
pre.reset_with_bias(ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)))
# Act / Assert — reset_with_bias also clears the accumulator.
with pytest.raises(ImuPreintegrationError):
pre.current_preintegration()
def test_integrate_window_propagates_through_samples() -> None:
# Arrange
pre = make_imu_preintegrator(_calibration())
samples = _make_samples(25)
window = ImuWindow(samples=samples, ts_start_ns=samples[0].ts_ns, ts_end_ns=samples[-1].ts_ns)
# Act
pre.integrate_window(window)
# Assert
pim = pre.current_preintegration()
assert pim.deltaTij() == pytest.approx((samples[-1].ts_ns - samples[0].ts_ns) * 1e-9)
def test_imu_preintegrator_is_an_instance_type() -> None:
# Arrange / Act
pre = make_imu_preintegrator(_calibration())
# Assert — factory returns the documented public type.
assert isinstance(pre, ImuPreintegrator)
def test_combined_imu_factor_re_export_is_callable() -> None:
# Assert — re-export resolves to GTSAM's CombinedImuFactor class.
assert CombinedImuFactor.__name__ == "CombinedImuFactor"