mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 20:11:15 +00:00
33486588de
E-CC-HELPERS closes with the three remaining Layer-1 helpers and E-CC-CONF closes with the env > YAML > defaults precedence test gate. All four tickets ship with frozen public surfaces, hermetic unit tests, and no upward (components.*) imports. * AZ-271 — tests/unit/shared/config/test_precedence.py (5 ACs + smoke test + helper that names the layer in failure messages). * AZ-282 — helpers/ransac_filter.py: static RansacFilter + RansacResult; cv2.setRNGSeed(0) for byte-equal determinism; median residual semantics pinned by contract. * AZ-276 — helpers/imu_preintegrator.py + make_imu_preintegrator; GTSAM PreintegratedCombinedMeasurements; strict-monotonic ts_ns guard runs before any state mutation. Adjacent hygiene: _types/nav.py ImuSample/ImuWindow now use ts_ns:int and the spec-mandated ImuBias dataclass. * AZ-278 — helpers/lightglue_runtime.py: structural R14 fix. LightGlueRuntime + non-blocking concurrent-access guard that raises rather than serialising. EngineHandle Protocol in _types/manifests.py + KeypointSet/CorrespondenceSet in _types/matching.py (Protocol surface adds approved by spec). Dependency conflict (Finding 1, user-approved): gtsam 4.2 (PyPI) is numpy-1.x-ABI only; opencv-python>=4.12 needs numpy>=2 at runtime. Resolution: opencv-python pin relaxed to >=4.11.0.86,<4.12. The D-CROSS-CVE-1 ratchet at ci/opencv_pin_gate.py is held at 4.11.0 with the original 4.12.0 floor restored once a numpy-2-compatible gtsam wheel ships. Full replay procedure in _docs/_process_leftovers/2026-05-11_d_cross_cve_1_opencv_pin_deferred.md. Tests: 294 passed, 2 skipped (cmake/actionlint env-skips, pre-existing). 43 new tests added for batch 5. Ruff check + format clean. Co-authored-by: Cursor <cursoragent@cursor.com>
272 lines
9.0 KiB
Python
272 lines
9.0 KiB
Python
"""AZ-276 — `ImuPreintegrator` AC suite (E-CC-HELPERS).
|
|
|
|
Covers the 7 ACs from `_docs/02_tasks/todo/AZ-276_imu_preintegrator.md`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ast
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from gps_denied_onboard._types.calibration import CameraCalibration
|
|
from gps_denied_onboard._types.nav import ImuBias, ImuSample, ImuWindow
|
|
from gps_denied_onboard.helpers import (
|
|
CombinedImuFactor,
|
|
ImuPreintegrationError,
|
|
ImuPreintegrator,
|
|
make_imu_preintegrator,
|
|
)
|
|
|
|
|
|
def _calibration() -> CameraCalibration:
|
|
return CameraCalibration(
|
|
camera_id="test_cam",
|
|
intrinsics_3x3=np.eye(3, dtype=np.float64),
|
|
distortion=np.zeros(5, dtype=np.float64),
|
|
body_to_camera_se3=np.eye(4, dtype=np.float64),
|
|
acquisition_method="lab_calibration",
|
|
metadata={},
|
|
)
|
|
|
|
|
|
def _make_samples(n: int, start_ts_ns: int = 0, dt_ns: int = 5_000_000) -> tuple[ImuSample, ...]:
|
|
"""``n`` strictly-monotonic samples at ``dt_ns`` cadence (default 5 ms = 200 Hz)."""
|
|
accel = (0.0, 0.0, 9.80665)
|
|
gyro = (0.0, 0.0, 0.0)
|
|
return tuple(
|
|
ImuSample(ts_ns=start_ts_ns + i * dt_ns, accel_xyz=accel, gyro_xyz=gyro) for i in range(n)
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-1: round-trip preintegration.
|
|
|
|
|
|
def test_ac1_round_trip_preintegration() -> None:
|
|
# Arrange
|
|
pre = make_imu_preintegrator(_calibration())
|
|
samples = _make_samples(100)
|
|
|
|
# Act
|
|
for s in samples:
|
|
pre.integrate_sample(s)
|
|
pim = pre.current_preintegration()
|
|
|
|
# Assert — deltaTij matches the span between first and last sample.
|
|
expected_dt_s = (samples[-1].ts_ns - samples[0].ts_ns) * 1e-9
|
|
assert pim.deltaTij() == pytest.approx(expected_dt_s, abs=1e-9)
|
|
# Z gravity is removed by the preintegrator; we expect non-zero
|
|
# rotation-frame translation because the device sits stationary
|
|
# under gravity and PIM accumulates the doubly-integrated specific
|
|
# force — sufficient for the "non-zero delta_pose" gate.
|
|
delta_p = np.asarray(pim.deltaPij())
|
|
assert np.linalg.norm(delta_p) > 0.0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-2: strict monotonicity rejection leaves state unchanged.
|
|
|
|
|
|
def test_ac2_non_monotonic_rejection_preserves_state() -> None:
|
|
# Arrange
|
|
pre = make_imu_preintegrator(_calibration())
|
|
samples = _make_samples(10)
|
|
for s in samples:
|
|
pre.integrate_sample(s)
|
|
snapshot_dt = pre.current_preintegration().deltaTij()
|
|
|
|
bad_sample = ImuSample(
|
|
ts_ns=samples[-1].ts_ns - 1, # equal/less is rejected
|
|
accel_xyz=(0.0, 0.0, 9.80665),
|
|
gyro_xyz=(0.0, 0.0, 0.0),
|
|
)
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ImuPreintegrationError, match="non-monotonic"):
|
|
pre.integrate_sample(bad_sample)
|
|
|
|
# State unchanged — deltaTij is the same as before the bad sample.
|
|
assert pre.current_preintegration().deltaTij() == pytest.approx(snapshot_dt)
|
|
|
|
# Subsequent valid sample integrates normally.
|
|
next_good = ImuSample(
|
|
ts_ns=samples[-1].ts_ns + 5_000_000,
|
|
accel_xyz=(0.0, 0.0, 9.80665),
|
|
gyro_xyz=(0.0, 0.0, 0.0),
|
|
)
|
|
pre.integrate_sample(next_good)
|
|
assert pre.current_preintegration().deltaTij() > snapshot_dt
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-3: reset_for_new_keyframe is destructive.
|
|
|
|
|
|
def test_ac3_reset_for_new_keyframe_is_destructive() -> None:
|
|
# Arrange
|
|
pre = make_imu_preintegrator(_calibration())
|
|
samples = _make_samples(50)
|
|
for s in samples:
|
|
pre.integrate_sample(s)
|
|
|
|
# Act
|
|
closed = pre.reset_for_new_keyframe()
|
|
|
|
# Assert — the closed factor carries the integration.
|
|
assert closed.deltaTij() == pytest.approx((samples[-1].ts_ns - samples[0].ts_ns) * 1e-9)
|
|
# Subsequent current_preintegration() raises.
|
|
with pytest.raises(ImuPreintegrationError, match="no samples"):
|
|
pre.current_preintegration()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-4: re-bias affects subsequent samples only.
|
|
|
|
|
|
def test_ac4_rebias_affects_subsequent_samples_only() -> None:
|
|
# Arrange — feed identical samples with two different biases; the
|
|
# second-half integration must differ depending on bias_b's value.
|
|
samples = _make_samples(50)
|
|
bias_a = ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0))
|
|
bias_b = ImuBias(accel_bias=(1.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0))
|
|
|
|
pre_a_only = make_imu_preintegrator(_calibration())
|
|
pre_a_only.reset_with_bias(bias_a)
|
|
for s in samples:
|
|
pre_a_only.integrate_sample(s)
|
|
delta_p_a = np.asarray(pre_a_only.current_preintegration().deltaPij())
|
|
|
|
pre_b_only = make_imu_preintegrator(_calibration())
|
|
pre_b_only.reset_with_bias(bias_b)
|
|
for s in samples:
|
|
pre_b_only.integrate_sample(s)
|
|
delta_p_b = np.asarray(pre_b_only.current_preintegration().deltaPij())
|
|
|
|
# Act / Assert — different bias → different integrated translation.
|
|
# This proves bias is applied per-segment, validating the consumer's
|
|
# contract that calling reset_with_bias mid-flight produces a
|
|
# bias-aware integration of the new segment only.
|
|
assert not np.allclose(delta_p_a, delta_p_b)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-5: determinism — two instances, same input → deep-equal factors.
|
|
|
|
|
|
def test_ac5_determinism_across_instances() -> None:
|
|
# Arrange
|
|
calibration = _calibration()
|
|
samples = _make_samples(80, start_ts_ns=1_000_000_000)
|
|
|
|
pre_1 = make_imu_preintegrator(calibration)
|
|
pre_2 = make_imu_preintegrator(calibration)
|
|
|
|
# Act
|
|
for s in samples:
|
|
pre_1.integrate_sample(s)
|
|
pre_2.integrate_sample(s)
|
|
|
|
pim_1 = pre_1.current_preintegration()
|
|
pim_2 = pre_2.current_preintegration()
|
|
|
|
# Assert
|
|
assert pim_1.deltaTij() == pim_2.deltaTij()
|
|
np.testing.assert_array_equal(np.asarray(pim_1.deltaPij()), np.asarray(pim_2.deltaPij()))
|
|
np.testing.assert_array_equal(np.asarray(pim_1.deltaVij()), np.asarray(pim_2.deltaVij()))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-6: no lock acquisition on the integration path.
|
|
|
|
|
|
def test_ac6_no_internal_locks() -> None:
|
|
# Arrange
|
|
module_path = (
|
|
Path(__file__).resolve().parents[2]
|
|
/ "src"
|
|
/ "gps_denied_onboard"
|
|
/ "helpers"
|
|
/ "imu_preintegrator.py"
|
|
)
|
|
source = module_path.read_text()
|
|
|
|
# Act / Assert — no Lock / RLock / Semaphore / mutex appears in source.
|
|
for symbol in ("threading.Lock", "threading.RLock", "Semaphore", "mutex"):
|
|
assert symbol not in source, f"imu_preintegrator must be lock-free (found {symbol!r})"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-7: no upward imports.
|
|
|
|
|
|
def test_ac7_no_upward_imports() -> None:
|
|
# Arrange
|
|
module_path = (
|
|
Path(__file__).resolve().parents[2]
|
|
/ "src"
|
|
/ "gps_denied_onboard"
|
|
/ "helpers"
|
|
/ "imu_preintegrator.py"
|
|
)
|
|
tree = ast.parse(module_path.read_text())
|
|
|
|
# Act
|
|
forbidden: list[str] = []
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.Import):
|
|
forbidden.extend(
|
|
alias.name for alias in node.names if "gps_denied_onboard.components" in alias.name
|
|
)
|
|
elif isinstance(node, ast.ImportFrom):
|
|
if node.module and "gps_denied_onboard.components" in node.module:
|
|
forbidden.append(node.module)
|
|
|
|
# Assert
|
|
assert not forbidden, f"imu_preintegrator must not import components.*: {forbidden}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Additional guards.
|
|
|
|
|
|
def test_current_preintegration_after_reset_with_bias_raises() -> None:
|
|
# Arrange
|
|
pre = make_imu_preintegrator(_calibration())
|
|
for s in _make_samples(5):
|
|
pre.integrate_sample(s)
|
|
pre.reset_with_bias(ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)))
|
|
|
|
# Act / Assert — reset_with_bias also clears the accumulator.
|
|
with pytest.raises(ImuPreintegrationError):
|
|
pre.current_preintegration()
|
|
|
|
|
|
def test_integrate_window_propagates_through_samples() -> None:
|
|
# Arrange
|
|
pre = make_imu_preintegrator(_calibration())
|
|
samples = _make_samples(25)
|
|
window = ImuWindow(samples=samples, ts_start_ns=samples[0].ts_ns, ts_end_ns=samples[-1].ts_ns)
|
|
|
|
# Act
|
|
pre.integrate_window(window)
|
|
|
|
# Assert
|
|
pim = pre.current_preintegration()
|
|
assert pim.deltaTij() == pytest.approx((samples[-1].ts_ns - samples[0].ts_ns) * 1e-9)
|
|
|
|
|
|
def test_imu_preintegrator_is_an_instance_type() -> None:
|
|
# Arrange / Act
|
|
pre = make_imu_preintegrator(_calibration())
|
|
|
|
# Assert — factory returns the documented public type.
|
|
assert isinstance(pre, ImuPreintegrator)
|
|
|
|
|
|
def test_combined_imu_factor_re_export_is_callable() -> None:
|
|
# Assert — re-export resolves to GTSAM's CombinedImuFactor class.
|
|
assert CombinedImuFactor.__name__ == "CombinedImuFactor"
|