Files
gps-denied-onboard/tests/unit/c5_state/test_az386_eskf_baseline.py
T
Oleksandr Bezdieniezhnykh db27e25630 [AZ-355] C4 PoseEstimator Protocol + factory + DTOs + composition
Land the foundational C4 surface AZ-358 (Marginals) and AZ-361
(Hybrid) build on top of:

- PoseEstimator Protocol (@runtime_checkable): estimate(...) +
  current_covariance_mode().
- Error hierarchy: PoseEstimatorError, PnpFailureError,
  PoseEstimatorConfigError; CovarianceDegradedWarning as a Warning
  subclass (warnings.warn path, not raised).
- ISam2GraphHandle Protocol stub (READ-ONLY view, get_pose_key only)
  decoupled from C5's concrete ISam2GraphHandleImpl.
- C4PoseConfig (frozen dataclass) + register on c4_pose import.
- runtime_root/pose_factory.build_pose_estimator with lazy-import
  fallback; INFO log c4.pose.strategy_loaded; shares ingest-thread
  binding with C5 per ADR-003.

DTO restructuring (cross-cutting): retire the legacy raw-4x4
PoseEstimate(int frame_id, datetime timestamp, pose_se3, ...) and
ship the contract shape PoseEstimate(UUID, LatLonAlt, Quat,
np.ndarray, CovarianceMode, PoseSourceLabel,
last_satellite_anchor_age_ms, emitted_at). C5 add_pose_anchor in
both gtsam_isam2 + eskf_baseline migrated in lockstep via
WGS84->ENU + Quat->R helpers; test fixtures updated. VIO output
stays on the raw shape until AZ-331 (C1 protocol) lands.

LatLonAlt upgraded to slots=True per AC-2. ThermalState stub added
to _types/thermal.py so the Protocol typechecks pre-AZ-302.

Tests: 25 new in tests/unit/c4_pose/test_az355_pose_protocol.py
covering AC-1..AC-10 + factory wiring + config validation; full
repo: 685 passed, 2 pre-existing CI-only skips.

Jira transition deferred: MCP "Not connected"; leftover entry in
_docs/_process_leftovers/2026-05-11_jira_transition_az355_deferred.md.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 10:32:14 +03:00

499 lines
16 KiB
Python

"""AZ-386 — `EskfStateEstimator` mandatory simple-baseline.
Covers AC-1..AC-10 from ``_docs/02_tasks/todo/AZ-386_c5_eskf_baseline.md``.
The tests construct an estimator directly (no GTSAM required) and
exercise:
- AC-1: Protocol conformance (``isinstance`` against ``StateEstimator``).
- AC-2: Prediction step accuracy on a stationary synthetic IMU
sequence (drift < 1 m over 5 s of zero specific force).
- AC-3: ``add_vio`` updates state + shrinks covariance over
consistent measurements.
- AC-4: ``add_pose_anchor`` integrates BOTH ``marginals`` and
``jacobian`` modes (no skip — ESKF has no graph).
- AC-5: Every emitted ``covariance_6x6`` is SPD; injected non-SPD
raises ``EstimatorFatalError``.
- AC-6: ``smoothed_history`` returns ``smoothed=False`` entries.
- AC-7: ``BUILD_STATE_ESKF=OFF`` rejected by the factory.
- AC-8: ``SourceLabelStateMachine`` is auto-wired (label updates on
satellite anchor).
- AC-9: Filter divergence handling raises ``EstimatorFatalError``.
- AC-10: ``config.strategy="eskf"`` + ``BUILD_STATE_ESKF=ON``
resolves to ``EskfStateEstimator`` via the runtime root factory.
"""
from __future__ import annotations
import dataclasses
from datetime import datetime, timezone
from typing import Any
from unittest import mock
from uuid import UUID, uuid4
import numpy as np
import pytest
from gps_denied_onboard._types.fc import GpsHealth, GpsStatus
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.nav import ImuSample, ImuWindow
from gps_denied_onboard._types.pose import CovarianceMode, PoseEstimate, Quat
from gps_denied_onboard._types.state import IsamState, PoseSourceLabel
from gps_denied_onboard._types.vio import VioOutput
from gps_denied_onboard.components.c5_state import (
C5StateConfig,
EstimatorFatalError,
StateEstimator,
StateEstimatorConfigError,
)
from gps_denied_onboard.components.c5_state.eskf_baseline import (
EskfStateEstimator,
create,
register,
)
from gps_denied_onboard.config import load_config
from gps_denied_onboard.config.schema import Config
from gps_denied_onboard.helpers.wgs_converter import WgsConverter
from gps_denied_onboard.runtime_root.state_factory import (
build_state_estimator,
clear_state_ingest_binding,
clear_state_registry,
)
# ----------------------------------------------------------------------
# Fixtures + helpers.
@pytest.fixture(autouse=True)
def _isolation():
clear_state_registry()
clear_state_ingest_binding()
yield
clear_state_registry()
clear_state_ingest_binding()
def _build_config(**state_overrides: Any) -> Config:
cfg = load_config(env={}, paths=(), require_env=False)
new_state = dataclasses.replace(C5StateConfig(strategy="eskf"), **state_overrides)
components = dict(cfg.components or {})
components["c5_state"] = new_state
return dataclasses.replace(cfg, components=components)
def _make_estimator(**kwargs: Any) -> EskfStateEstimator:
cfg = _build_config(**kwargs)
return EskfStateEstimator(
cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
def _identity_pose() -> np.ndarray:
return np.eye(4, dtype=np.float64)
def _pose_translated(x: float, y: float, z: float) -> np.ndarray:
p = np.eye(4, dtype=np.float64)
p[0, 3] = x
p[1, 3] = y
p[2, 3] = z
return p
def _vio(
frame_id: int,
ts_ns: int,
pose: np.ndarray,
cov: np.ndarray | None = None,
) -> VioOutput:
# Default sigma ≈ 0.1 m / 0.1 rad — realistic VIO uncertainty.
return VioOutput(
frame_id=frame_id,
timestamp=datetime.fromtimestamp(ts_ns / 1_000_000_000, tz=timezone.utc),
pose_se3=pose,
covariance_6x6=cov if cov is not None else np.eye(6) * 0.01,
)
_ENU_ORIGIN = LatLonAlt(lat_deg=0.0, lon_deg=0.0, alt_m=0.0)
def _rot_to_quat(R: np.ndarray) -> Quat:
"""3x3 rotation → unit quaternion (w,x,y,z); standard robust formula."""
trace = R[0, 0] + R[1, 1] + R[2, 2]
if trace > 0:
s = 0.5 / np.sqrt(trace + 1.0)
w = 0.25 / s
x = (R[2, 1] - R[1, 2]) * s
y = (R[0, 2] - R[2, 0]) * s
z = (R[1, 0] - R[0, 1]) * s
elif R[0, 0] > R[1, 1] and R[0, 0] > R[2, 2]:
s = 2.0 * np.sqrt(1.0 + R[0, 0] - R[1, 1] - R[2, 2])
w = (R[2, 1] - R[1, 2]) / s
x = 0.25 * s
y = (R[0, 1] + R[1, 0]) / s
z = (R[0, 2] + R[2, 0]) / s
elif R[1, 1] > R[2, 2]:
s = 2.0 * np.sqrt(1.0 + R[1, 1] - R[0, 0] - R[2, 2])
w = (R[0, 2] - R[2, 0]) / s
x = (R[0, 1] + R[1, 0]) / s
y = 0.25 * s
z = (R[1, 2] + R[2, 1]) / s
else:
s = 2.0 * np.sqrt(1.0 + R[2, 2] - R[0, 0] - R[1, 1])
w = (R[1, 0] - R[0, 1]) / s
x = (R[0, 2] + R[2, 0]) / s
y = (R[1, 2] + R[2, 1]) / s
z = 0.25 * s
return Quat(w=float(w), x=float(x), y=float(y), z=float(z))
def _pose_anchor(
frame_id: int,
ts_ns: int,
pose: np.ndarray,
cov: np.ndarray | None = None,
mode: CovarianceMode | str = CovarianceMode.MARGINALS,
) -> PoseEstimate:
rotation = pose[:3, :3]
translation = np.ascontiguousarray(pose[:3, 3], dtype=np.float64)
position = WgsConverter.local_enu_to_latlonalt(_ENU_ORIGIN, translation)
quat = _rot_to_quat(rotation)
mode_enum = mode if isinstance(mode, CovarianceMode) else CovarianceMode(mode.lower())
return PoseEstimate(
frame_id=UUID(int=frame_id),
position_wgs84=position,
orientation_world_T_body=quat,
covariance_6x6=cov if cov is not None else np.eye(6) * 0.01,
covariance_mode=mode_enum,
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
last_satellite_anchor_age_ms=0,
emitted_at=ts_ns,
)
def _imu_window(
*, ts_start_ns: int, dt_s: float, n_samples: int, accel: tuple, gyro: tuple
) -> ImuWindow:
samples = tuple(
ImuSample(
ts_ns=ts_start_ns + int(i * dt_s * 1_000_000_000),
accel_xyz=accel,
gyro_xyz=gyro,
)
for i in range(n_samples)
)
return ImuWindow(
samples=samples,
ts_start_ns=samples[0].ts_ns,
ts_end_ns=samples[-1].ts_ns,
)
# ----------------------------------------------------------------------
# AC-1: Protocol conformance.
def test_ac1_eskf_satisfies_state_estimator_protocol() -> None:
estimator = _make_estimator()
assert isinstance(estimator, StateEstimator)
# ----------------------------------------------------------------------
# AC-2: Prediction step accuracy.
def test_ac2_prediction_drift_under_one_metre_over_five_seconds() -> None:
# Arrange — stationary FC: specific force = -gravity in body frame
# (so world-frame accel is 0). Zero gyro. 100 Hz over 5 s.
estimator = _make_estimator()
accel_body_stationary = (0.0, 0.0, 9.80665)
window = _imu_window(
ts_start_ns=1_000_000_000,
dt_s=0.01,
n_samples=501,
accel=accel_body_stationary,
gyro=(0.0, 0.0, 0.0),
)
# Act
estimator.add_fc_imu(window)
# Assert — under perfect zero specific force the nominal
# position must remain near the origin. Forward Euler accrues a
# small residual from quaternion-renormalisation noise.
drift = float(np.linalg.norm(estimator._nominal_pos))
assert drift < 1.0
# ----------------------------------------------------------------------
# AC-3: Relative-pose update via add_vio.
def test_ac3_add_vio_updates_state_and_covariance_shrinks() -> None:
# Arrange — two consistent VIO frames; first frame seeds, second
# frame measures a 1 m forward delta.
estimator = _make_estimator()
cov_before_first = estimator._pose_covariance_6x6().copy()
estimator.add_vio(_vio(1, 1_000_000_000, _identity_pose()))
estimator.add_vio(_vio(2, 1_100_000_000, _pose_translated(1.0, 0.0, 0.0)))
# Assert
cov_after = estimator._pose_covariance_6x6()
assert np.linalg.norm(cov_after, ord="fro") <= np.linalg.norm(cov_before_first, ord="fro")
# ----------------------------------------------------------------------
# AC-4: Absolute-pose update; mode doesn't skip ESKF.
def test_ac4_add_pose_anchor_runs_for_marginals_and_jacobian() -> None:
# Arrange — two anchors, one in each mode; both must shift the
# nominal position toward the measurement.
estimator_m = _make_estimator()
estimator_j = _make_estimator()
pose_m = _pose_anchor(1, 1_000_000_000, _pose_translated(5.0, 0.0, 0.0), mode="marginals")
pose_j = _pose_anchor(2, 1_000_000_000, _pose_translated(5.0, 0.0, 0.0), mode="jacobian")
# Act
estimator_m.add_pose_anchor(pose_m)
estimator_j.add_pose_anchor(pose_j)
# Assert — both modes move the nominal toward the measurement.
assert estimator_m._nominal_pos[0] > 0.1
assert estimator_j._nominal_pos[0] > 0.1
assert estimator_m._last_anchor_ns > 0
assert estimator_j._last_anchor_ns > 0
def test_ac4_pose_anchor_sets_last_anchor_ns() -> None:
estimator = _make_estimator()
pose = _pose_anchor(1, 1_000_000_000, _pose_translated(1.0, 0.0, 0.0))
estimator.add_pose_anchor(pose)
assert estimator._last_anchor_ns > 0
# ----------------------------------------------------------------------
# AC-5: SPD covariance enforcement.
def test_ac5_current_estimate_emits_spd_covariance() -> None:
estimator = _make_estimator()
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
out = estimator.current_estimate()
assert out.covariance_6x6.shape == (6, 6)
np.linalg.cholesky(out.covariance_6x6)
def test_ac5_non_spd_covariance_raises_fatal() -> None:
# Arrange — force the internal covariance to be indefinite.
estimator = _make_estimator()
P = np.eye(16, dtype=np.float64)
P[0, 0] = -1.0 # negative variance on x-position
estimator._P = P
# Act / Assert
with pytest.raises(EstimatorFatalError, match="not SPD"):
estimator.current_estimate()
assert estimator._isam2_state == IsamState.LOST
# ----------------------------------------------------------------------
# AC-6: smoothed_history honesty.
def test_ac6_smoothed_history_returns_smoothed_false_entries() -> None:
estimator = _make_estimator()
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
estimator.current_estimate()
estimator.add_pose_anchor(_pose_anchor(2, 1_100_000_000, _pose_translated(0.5, 0.0, 0.0)))
estimator.current_estimate()
history = estimator.smoothed_history(n_keyframes=10)
assert len(history) == 2
assert all(entry.smoothed is False for entry in history)
def test_ac6_smoothed_history_empty_buffer_returns_empty() -> None:
estimator = _make_estimator()
assert estimator.smoothed_history(10) == []
def test_ac6_smoothed_history_n_zero_returns_empty() -> None:
estimator = _make_estimator()
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
estimator.current_estimate()
assert estimator.smoothed_history(0) == []
# ----------------------------------------------------------------------
# AC-7: BUILD_STATE_ESKF=OFF rejection.
def test_ac7_build_state_eskf_off_rejected(monkeypatch: pytest.MonkeyPatch) -> None:
register()
monkeypatch.setenv("BUILD_STATE_ESKF", "OFF")
cfg = _build_config()
with pytest.raises(StateEstimatorConfigError, match="BUILD_STATE_ESKF is OFF"):
build_state_estimator(
cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
# ----------------------------------------------------------------------
# AC-8: SourceLabelStateMachine integration.
def test_ac8_source_label_machine_is_auto_wired() -> None:
# Arrange — gate is open by default; the first satellite anchor
# transitions the label DEAD_RECKONED → SATELLITE_ANCHORED.
estimator = _make_estimator()
assert estimator.current_estimate().source_label == PoseSourceLabel.DEAD_RECKONED
# Act
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
out_after = estimator.current_estimate()
# Assert
assert out_after.source_label == PoseSourceLabel.SATELLITE_ANCHORED
def test_ac8_spoof_status_closes_gate_and_rejects_anchor() -> None:
# Arrange — SPOOFED GPS latches the gate closed; next anchor is
# a promotion attempt that fails (no consistency check possible
# because the estimator passes ``gps_consistency_delta_m=None``).
estimator = _make_estimator()
received: list[Any] = []
estimator.subscribe_spoof_rejection(
lambda reason, severity, text: received.append((reason, severity, text))
)
estimator.notify_gps_health(_gps_status(GpsStatus.SPOOFED), now_ns=0)
# Act
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
# Assert
assert len(received) == 1
assert estimator.current_estimate().source_label == PoseSourceLabel.VISUAL_PROPAGATED
# ----------------------------------------------------------------------
# AC-9: Filter divergence handling.
def test_ac9_filter_divergence_raises_fatal() -> None:
# Arrange — feed a pose anchor whose measured position is wildly
# inconsistent with the nominal state, with a tight measurement
# covariance. innov_mag = 1000 m; sqrt(||R||_F) ≈ 1e-2 → ratio ≫ 10.
estimator = _make_estimator()
far_pose = _pose_translated(1000.0, 0.0, 0.0)
tight_cov = np.eye(6) * 1e-6
# Act / Assert
with pytest.raises(EstimatorFatalError, match="filter divergence"):
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, far_pose, cov=tight_cov))
assert estimator._isam2_state == IsamState.LOST
# ----------------------------------------------------------------------
# AC-10: Composition wiring through the runtime root.
def test_ac10_factory_resolves_eskf_strategy(monkeypatch: pytest.MonkeyPatch) -> None:
register()
monkeypatch.setenv("BUILD_STATE_ESKF", "ON")
cfg = _build_config(strategy="eskf")
estimator, handle = build_state_estimator(
cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
assert isinstance(estimator, EskfStateEstimator)
assert handle is None
def test_ac10_module_level_create_returns_protocol_and_none_handle() -> None:
cfg = _build_config()
estimator, handle = create(
config=cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
assert isinstance(estimator, StateEstimator)
assert handle is None
# ----------------------------------------------------------------------
# Other robustness checks.
def test_health_snapshot_reports_init_then_tracking() -> None:
estimator = _make_estimator()
assert estimator.health_snapshot().isam2_state == IsamState.INIT
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
assert estimator.health_snapshot().isam2_state == IsamState.TRACKING
def test_current_estimate_returns_uuid_frame_id_and_increments_keyframes() -> None:
estimator = _make_estimator()
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
out = estimator.current_estimate()
assert isinstance(out.frame_id, type(uuid4()))
assert estimator.health_snapshot().keyframe_count == 1
def test_out_of_order_vio_raises_degraded() -> None:
from gps_denied_onboard.components.c5_state.errors import EstimatorDegradedError
estimator = _make_estimator()
estimator.add_vio(_vio(1, 2_000_000_000, _identity_pose()))
with pytest.raises(EstimatorDegradedError, match="out-of-order"):
estimator.add_vio(_vio(2, 1_000_000_000, _identity_pose()))
def test_pose_anchor_sets_velocity_to_zero_initially() -> None:
estimator = _make_estimator()
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
out = estimator.current_estimate()
assert out.velocity_world_mps == (0.0, 0.0, 0.0)
# ----------------------------------------------------------------------
# Helpers for AC-8 GPS health construction.
def _gps_status(status: GpsStatus) -> GpsHealth:
return GpsHealth(status=status, fix_age_ms=0, captured_at=0)