Files
Oleksandr Bezdieniezhnykh 6c7d24f7e0 [AZ-331] C1 VioStrategy: Protocol + DTOs + factory + C5 migration
Freezes the c1_vio Public API per
_docs/02_document/contracts/c1_vio/vio_strategy_protocol.md v1.0.0:

- VioStrategy Protocol (4 methods: process_frame, reset_to_warm_start,
  health_snapshot, current_strategy_label) in
  components/c1_vio/interface.py.
- DTOs (VioOutput, VioHealth, FeatureQuality, WarmStartPose) + VioState
  enum in _types/nav.py — L1 placement so C5 + C13 consume them without
  crossing the components.* boundary (AZ-270 AC-6). The new VioOutput
  shape (frame_id: str, relative_pose_T: gtsam.Pose3,
  pose_covariance_6x6, imu_bias, feature_quality, emitted_at_ns)
  replaces the AZ-263 scaffolding in _types/vio.py, which is now
  deleted.
- VioError family (VioInitializingError / VioDegradedError /
  VioFatalError) in components/c1_vio/errors.py. Documented
  rationale: the degraded-operation path returns a VioOutput with
  inflated covariance + VioHealth.state=DEGRADED rather than raising
  VioDegradedError — the error type exists only for the rare
  degraded->fatal transition.
- C1VioConfig per-component config block (strategy enum,
  lost_frame_threshold default 9, warm_start_max_frames default 5)
  with constructor-time validation rejecting unknown strategy labels.
- StrategyNotAvailableError added to runtime_root/errors.py;
  composition-time error distinct from the VioError family.
- Composition-root factory build_vio_strategy in
  runtime_root/vio_factory.py with three BUILD_* gates (BUILD_OKVIS2,
  BUILD_VINS_MONO, BUILD_KLT_RANSAC). Concrete strategy modules are
  imported lazily via __import__ AFTER the flag check — Tier-0
  workstation builds with the flag OFF MUST NOT load the strategy
  module (Risk-2 / I-5; verifiable via sys.modules).
- 36 conformance tests cover all 9 ACs + NFR-perf-factory
  (p99 build under 200 ms x 1000 calls) + NFR-reliability-error-family.
  AC-8 introspects the contract file's Shape table and asserts method
  parity against the runtime Protocol; AC-9 asserts the frame_id
  annotation is 'str' (PEP-563 stringified).

C5 migration (consumers of the new VioOutput shape):
- gtsam_isam2_estimator.py + eskf_baseline.py: replaced
  vio.timestamp -> vio.emitted_at_ns (drops _datetime_to_ns on the
  VIO path), vio.pose_se3 -> vio.relative_pose_T (gtsam.Pose3 direct;
  drops _pose_se3_to_gtsam / _pose_se3_to_array), vio.covariance_6x6
  -> vio.pose_covariance_6x6 (rename).
- key_for_frame signature widened to UUID | int | str to accept the
  new str frame_id.
- 4 C5 test files migrated to the new VioOutput shape with helper
  fixtures producing ImuBias + FeatureQuality + str frame_id.
- c5_state/interface.py TYPE_CHECKING import path updated.

Bootstrap healthcheck + test_types_importable updated to drop the
deleted _types/vio module and pick up _types/inference (AZ-297) in
the same sweep.

Full unit-test sweep: 884 passed, 2 pre-existing environment skips
(cmake, actionlint).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-12 04:44:31 +03:00

512 lines
16 KiB
Python

"""AZ-386 — `EskfStateEstimator` mandatory simple-baseline.
Covers AC-1..AC-10 from ``_docs/02_tasks/todo/AZ-386_c5_eskf_baseline.md``.
The tests construct an estimator directly (no GTSAM required) and
exercise:
- AC-1: Protocol conformance (``isinstance`` against ``StateEstimator``).
- AC-2: Prediction step accuracy on a stationary synthetic IMU
sequence (drift < 1 m over 5 s of zero specific force).
- AC-3: ``add_vio`` updates state + shrinks covariance over
consistent measurements.
- AC-4: ``add_pose_anchor`` integrates BOTH ``marginals`` and
``jacobian`` modes (no skip — ESKF has no graph).
- AC-5: Every emitted ``covariance_6x6`` is SPD; injected non-SPD
raises ``EstimatorFatalError``.
- AC-6: ``smoothed_history`` returns ``smoothed=False`` entries.
- AC-7: ``BUILD_STATE_ESKF=OFF`` rejected by the factory.
- AC-8: ``SourceLabelStateMachine`` is auto-wired (label updates on
satellite anchor).
- AC-9: Filter divergence handling raises ``EstimatorFatalError``.
- AC-10: ``config.strategy="eskf"`` + ``BUILD_STATE_ESKF=ON``
resolves to ``EskfStateEstimator`` via the runtime root factory.
"""
from __future__ import annotations
import dataclasses
from datetime import datetime, timezone
from typing import Any
from unittest import mock
from uuid import UUID, uuid4
import numpy as np
import pytest
from gps_denied_onboard._types.fc import GpsHealth, GpsStatus
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.nav import (
FeatureQuality,
ImuBias,
ImuSample,
ImuWindow,
VioOutput,
)
from gps_denied_onboard._types.pose import CovarianceMode, PoseEstimate, Quat
from gps_denied_onboard._types.state import IsamState, PoseSourceLabel
from gps_denied_onboard.components.c5_state import (
C5StateConfig,
EstimatorFatalError,
StateEstimator,
StateEstimatorConfigError,
)
from gps_denied_onboard.components.c5_state.eskf_baseline import (
EskfStateEstimator,
create,
register,
)
from gps_denied_onboard.config import load_config
from gps_denied_onboard.config.schema import Config
from gps_denied_onboard.helpers.wgs_converter import WgsConverter
from gps_denied_onboard.runtime_root.state_factory import (
build_state_estimator,
clear_state_ingest_binding,
clear_state_registry,
)
# ----------------------------------------------------------------------
# Fixtures + helpers.
@pytest.fixture(autouse=True)
def _isolation():
clear_state_registry()
clear_state_ingest_binding()
yield
clear_state_registry()
clear_state_ingest_binding()
def _build_config(**state_overrides: Any) -> Config:
cfg = load_config(env={}, paths=(), require_env=False)
new_state = dataclasses.replace(C5StateConfig(strategy="eskf"), **state_overrides)
components = dict(cfg.components or {})
components["c5_state"] = new_state
return dataclasses.replace(cfg, components=components)
def _make_estimator(**kwargs: Any) -> EskfStateEstimator:
cfg = _build_config(**kwargs)
return EskfStateEstimator(
cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
def _identity_pose() -> np.ndarray:
return np.eye(4, dtype=np.float64)
def _pose_translated(x: float, y: float, z: float) -> np.ndarray:
p = np.eye(4, dtype=np.float64)
p[0, 3] = x
p[1, 3] = y
p[2, 3] = z
return p
_ESKF_ZERO_BIAS = ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0))
_ESKF_NEUTRAL_FQ = FeatureQuality(
tracked=20, new=2, lost=1, mean_parallax=5.0, mre_px=1.0,
)
def _vio(
frame_id: int,
ts_ns: int,
pose: np.ndarray,
cov: np.ndarray | None = None,
) -> VioOutput:
import gtsam
return VioOutput(
frame_id=str(frame_id),
relative_pose_T=gtsam.Pose3(pose),
pose_covariance_6x6=cov if cov is not None else np.eye(6) * 0.01,
imu_bias=_ESKF_ZERO_BIAS,
feature_quality=_ESKF_NEUTRAL_FQ,
emitted_at_ns=int(ts_ns),
)
_ENU_ORIGIN = LatLonAlt(lat_deg=0.0, lon_deg=0.0, alt_m=0.0)
def _rot_to_quat(R: np.ndarray) -> Quat:
"""3x3 rotation → unit quaternion (w,x,y,z); standard robust formula."""
trace = R[0, 0] + R[1, 1] + R[2, 2]
if trace > 0:
s = 0.5 / np.sqrt(trace + 1.0)
w = 0.25 / s
x = (R[2, 1] - R[1, 2]) * s
y = (R[0, 2] - R[2, 0]) * s
z = (R[1, 0] - R[0, 1]) * s
elif R[0, 0] > R[1, 1] and R[0, 0] > R[2, 2]:
s = 2.0 * np.sqrt(1.0 + R[0, 0] - R[1, 1] - R[2, 2])
w = (R[2, 1] - R[1, 2]) / s
x = 0.25 * s
y = (R[0, 1] + R[1, 0]) / s
z = (R[0, 2] + R[2, 0]) / s
elif R[1, 1] > R[2, 2]:
s = 2.0 * np.sqrt(1.0 + R[1, 1] - R[0, 0] - R[2, 2])
w = (R[0, 2] - R[2, 0]) / s
x = (R[0, 1] + R[1, 0]) / s
y = 0.25 * s
z = (R[1, 2] + R[2, 1]) / s
else:
s = 2.0 * np.sqrt(1.0 + R[2, 2] - R[0, 0] - R[1, 1])
w = (R[1, 0] - R[0, 1]) / s
x = (R[0, 2] + R[2, 0]) / s
y = (R[1, 2] + R[2, 1]) / s
z = 0.25 * s
return Quat(w=float(w), x=float(x), y=float(y), z=float(z))
def _pose_anchor(
frame_id: int,
ts_ns: int,
pose: np.ndarray,
cov: np.ndarray | None = None,
mode: CovarianceMode | str = CovarianceMode.MARGINALS,
) -> PoseEstimate:
rotation = pose[:3, :3]
translation = np.ascontiguousarray(pose[:3, 3], dtype=np.float64)
position = WgsConverter.local_enu_to_latlonalt(_ENU_ORIGIN, translation)
quat = _rot_to_quat(rotation)
mode_enum = mode if isinstance(mode, CovarianceMode) else CovarianceMode(mode.lower())
return PoseEstimate(
frame_id=UUID(int=frame_id),
position_wgs84=position,
orientation_world_T_body=quat,
covariance_6x6=cov if cov is not None else np.eye(6) * 0.01,
covariance_mode=mode_enum,
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
last_satellite_anchor_age_ms=0,
emitted_at=ts_ns,
)
def _imu_window(
*, ts_start_ns: int, dt_s: float, n_samples: int, accel: tuple, gyro: tuple
) -> ImuWindow:
samples = tuple(
ImuSample(
ts_ns=ts_start_ns + int(i * dt_s * 1_000_000_000),
accel_xyz=accel,
gyro_xyz=gyro,
)
for i in range(n_samples)
)
return ImuWindow(
samples=samples,
ts_start_ns=samples[0].ts_ns,
ts_end_ns=samples[-1].ts_ns,
)
# ----------------------------------------------------------------------
# AC-1: Protocol conformance.
def test_ac1_eskf_satisfies_state_estimator_protocol() -> None:
estimator = _make_estimator()
assert isinstance(estimator, StateEstimator)
# ----------------------------------------------------------------------
# AC-2: Prediction step accuracy.
def test_ac2_prediction_drift_under_one_metre_over_five_seconds() -> None:
# Arrange — stationary FC: specific force = -gravity in body frame
# (so world-frame accel is 0). Zero gyro. 100 Hz over 5 s.
estimator = _make_estimator()
accel_body_stationary = (0.0, 0.0, 9.80665)
window = _imu_window(
ts_start_ns=1_000_000_000,
dt_s=0.01,
n_samples=501,
accel=accel_body_stationary,
gyro=(0.0, 0.0, 0.0),
)
# Act
estimator.add_fc_imu(window)
# Assert — under perfect zero specific force the nominal
# position must remain near the origin. Forward Euler accrues a
# small residual from quaternion-renormalisation noise.
drift = float(np.linalg.norm(estimator._nominal_pos))
assert drift < 1.0
# ----------------------------------------------------------------------
# AC-3: Relative-pose update via add_vio.
def test_ac3_add_vio_updates_state_and_covariance_shrinks() -> None:
# Arrange — two consistent VIO frames; first frame seeds, second
# frame measures a 1 m forward delta.
estimator = _make_estimator()
cov_before_first = estimator._pose_covariance_6x6().copy()
estimator.add_vio(_vio(1, 1_000_000_000, _identity_pose()))
estimator.add_vio(_vio(2, 1_100_000_000, _pose_translated(1.0, 0.0, 0.0)))
# Assert
cov_after = estimator._pose_covariance_6x6()
assert np.linalg.norm(cov_after, ord="fro") <= np.linalg.norm(cov_before_first, ord="fro")
# ----------------------------------------------------------------------
# AC-4: Absolute-pose update; mode doesn't skip ESKF.
def test_ac4_add_pose_anchor_runs_for_marginals_and_jacobian() -> None:
# Arrange — two anchors, one in each mode; both must shift the
# nominal position toward the measurement.
estimator_m = _make_estimator()
estimator_j = _make_estimator()
pose_m = _pose_anchor(1, 1_000_000_000, _pose_translated(5.0, 0.0, 0.0), mode="marginals")
pose_j = _pose_anchor(2, 1_000_000_000, _pose_translated(5.0, 0.0, 0.0), mode="jacobian")
# Act
estimator_m.add_pose_anchor(pose_m)
estimator_j.add_pose_anchor(pose_j)
# Assert — both modes move the nominal toward the measurement.
assert estimator_m._nominal_pos[0] > 0.1
assert estimator_j._nominal_pos[0] > 0.1
assert estimator_m._last_anchor_ns > 0
assert estimator_j._last_anchor_ns > 0
def test_ac4_pose_anchor_sets_last_anchor_ns() -> None:
estimator = _make_estimator()
pose = _pose_anchor(1, 1_000_000_000, _pose_translated(1.0, 0.0, 0.0))
estimator.add_pose_anchor(pose)
assert estimator._last_anchor_ns > 0
# ----------------------------------------------------------------------
# AC-5: SPD covariance enforcement.
def test_ac5_current_estimate_emits_spd_covariance() -> None:
estimator = _make_estimator()
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
out = estimator.current_estimate()
assert out.covariance_6x6.shape == (6, 6)
np.linalg.cholesky(out.covariance_6x6)
def test_ac5_non_spd_covariance_raises_fatal() -> None:
# Arrange — force the internal covariance to be indefinite.
estimator = _make_estimator()
P = np.eye(16, dtype=np.float64)
P[0, 0] = -1.0 # negative variance on x-position
estimator._P = P
# Act / Assert
with pytest.raises(EstimatorFatalError, match="not SPD"):
estimator.current_estimate()
assert estimator._isam2_state == IsamState.LOST
# ----------------------------------------------------------------------
# AC-6: smoothed_history honesty.
def test_ac6_smoothed_history_returns_smoothed_false_entries() -> None:
estimator = _make_estimator()
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
estimator.current_estimate()
estimator.add_pose_anchor(_pose_anchor(2, 1_100_000_000, _pose_translated(0.5, 0.0, 0.0)))
estimator.current_estimate()
history = estimator.smoothed_history(n_keyframes=10)
assert len(history) == 2
assert all(entry.smoothed is False for entry in history)
def test_ac6_smoothed_history_empty_buffer_returns_empty() -> None:
estimator = _make_estimator()
assert estimator.smoothed_history(10) == []
def test_ac6_smoothed_history_n_zero_returns_empty() -> None:
estimator = _make_estimator()
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
estimator.current_estimate()
assert estimator.smoothed_history(0) == []
# ----------------------------------------------------------------------
# AC-7: BUILD_STATE_ESKF=OFF rejection.
def test_ac7_build_state_eskf_off_rejected(monkeypatch: pytest.MonkeyPatch) -> None:
register()
monkeypatch.setenv("BUILD_STATE_ESKF", "OFF")
cfg = _build_config()
with pytest.raises(StateEstimatorConfigError, match="BUILD_STATE_ESKF is OFF"):
build_state_estimator(
cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
# ----------------------------------------------------------------------
# AC-8: SourceLabelStateMachine integration.
def test_ac8_source_label_machine_is_auto_wired() -> None:
# Arrange — gate is open by default; the first satellite anchor
# transitions the label DEAD_RECKONED → SATELLITE_ANCHORED.
estimator = _make_estimator()
assert estimator.current_estimate().source_label == PoseSourceLabel.DEAD_RECKONED
# Act
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
out_after = estimator.current_estimate()
# Assert
assert out_after.source_label == PoseSourceLabel.SATELLITE_ANCHORED
def test_ac8_spoof_status_closes_gate_and_rejects_anchor() -> None:
# Arrange — SPOOFED GPS latches the gate closed; next anchor is
# a promotion attempt that fails (no consistency check possible
# because the estimator passes ``gps_consistency_delta_m=None``).
estimator = _make_estimator()
received: list[Any] = []
estimator.subscribe_spoof_rejection(
lambda reason, severity, text: received.append((reason, severity, text))
)
estimator.notify_gps_health(_gps_status(GpsStatus.SPOOFED), now_ns=0)
# Act
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
# Assert
assert len(received) == 1
assert estimator.current_estimate().source_label == PoseSourceLabel.VISUAL_PROPAGATED
# ----------------------------------------------------------------------
# AC-9: Filter divergence handling.
def test_ac9_filter_divergence_raises_fatal() -> None:
# Arrange — feed a pose anchor whose measured position is wildly
# inconsistent with the nominal state, with a tight measurement
# covariance. innov_mag = 1000 m; sqrt(||R||_F) ≈ 1e-2 → ratio ≫ 10.
estimator = _make_estimator()
far_pose = _pose_translated(1000.0, 0.0, 0.0)
tight_cov = np.eye(6) * 1e-6
# Act / Assert
with pytest.raises(EstimatorFatalError, match="filter divergence"):
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, far_pose, cov=tight_cov))
assert estimator._isam2_state == IsamState.LOST
# ----------------------------------------------------------------------
# AC-10: Composition wiring through the runtime root.
def test_ac10_factory_resolves_eskf_strategy(monkeypatch: pytest.MonkeyPatch) -> None:
register()
monkeypatch.setenv("BUILD_STATE_ESKF", "ON")
cfg = _build_config(strategy="eskf")
estimator, handle = build_state_estimator(
cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
assert isinstance(estimator, EskfStateEstimator)
assert handle is None
def test_ac10_module_level_create_returns_protocol_and_none_handle() -> None:
cfg = _build_config()
estimator, handle = create(
config=cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
assert isinstance(estimator, StateEstimator)
assert handle is None
# ----------------------------------------------------------------------
# Other robustness checks.
def test_health_snapshot_reports_init_then_tracking() -> None:
estimator = _make_estimator()
assert estimator.health_snapshot().isam2_state == IsamState.INIT
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
assert estimator.health_snapshot().isam2_state == IsamState.TRACKING
def test_current_estimate_returns_uuid_frame_id_and_increments_keyframes() -> None:
estimator = _make_estimator()
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
out = estimator.current_estimate()
assert isinstance(out.frame_id, type(uuid4()))
assert estimator.health_snapshot().keyframe_count == 1
def test_out_of_order_vio_raises_degraded() -> None:
from gps_denied_onboard.components.c5_state.errors import EstimatorDegradedError
estimator = _make_estimator()
estimator.add_vio(_vio(1, 2_000_000_000, _identity_pose()))
with pytest.raises(EstimatorDegradedError, match="out-of-order"):
estimator.add_vio(_vio(2, 1_000_000_000, _identity_pose()))
def test_pose_anchor_sets_velocity_to_zero_initially() -> None:
estimator = _make_estimator()
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
out = estimator.current_estimate()
assert out.velocity_world_mps == (0.0, 0.0, 0.0)
# ----------------------------------------------------------------------
# Helpers for AC-8 GPS health construction.
def _gps_status(status: GpsStatus) -> GpsHealth:
return GpsHealth(status=status, fix_age_ms=0, captured_at=0)