mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 23:11:13 +00:00
6c7d24f7e0
Freezes the c1_vio Public API per _docs/02_document/contracts/c1_vio/vio_strategy_protocol.md v1.0.0: - VioStrategy Protocol (4 methods: process_frame, reset_to_warm_start, health_snapshot, current_strategy_label) in components/c1_vio/interface.py. - DTOs (VioOutput, VioHealth, FeatureQuality, WarmStartPose) + VioState enum in _types/nav.py — L1 placement so C5 + C13 consume them without crossing the components.* boundary (AZ-270 AC-6). The new VioOutput shape (frame_id: str, relative_pose_T: gtsam.Pose3, pose_covariance_6x6, imu_bias, feature_quality, emitted_at_ns) replaces the AZ-263 scaffolding in _types/vio.py, which is now deleted. - VioError family (VioInitializingError / VioDegradedError / VioFatalError) in components/c1_vio/errors.py. Documented rationale: the degraded-operation path returns a VioOutput with inflated covariance + VioHealth.state=DEGRADED rather than raising VioDegradedError — the error type exists only for the rare degraded->fatal transition. - C1VioConfig per-component config block (strategy enum, lost_frame_threshold default 9, warm_start_max_frames default 5) with constructor-time validation rejecting unknown strategy labels. - StrategyNotAvailableError added to runtime_root/errors.py; composition-time error distinct from the VioError family. - Composition-root factory build_vio_strategy in runtime_root/vio_factory.py with three BUILD_* gates (BUILD_OKVIS2, BUILD_VINS_MONO, BUILD_KLT_RANSAC). Concrete strategy modules are imported lazily via __import__ AFTER the flag check — Tier-0 workstation builds with the flag OFF MUST NOT load the strategy module (Risk-2 / I-5; verifiable via sys.modules). - 36 conformance tests cover all 9 ACs + NFR-perf-factory (p99 build under 200 ms x 1000 calls) + NFR-reliability-error-family. AC-8 introspects the contract file's Shape table and asserts method parity against the runtime Protocol; AC-9 asserts the frame_id annotation is 'str' (PEP-563 stringified). C5 migration (consumers of the new VioOutput shape): - gtsam_isam2_estimator.py + eskf_baseline.py: replaced vio.timestamp -> vio.emitted_at_ns (drops _datetime_to_ns on the VIO path), vio.pose_se3 -> vio.relative_pose_T (gtsam.Pose3 direct; drops _pose_se3_to_gtsam / _pose_se3_to_array), vio.covariance_6x6 -> vio.pose_covariance_6x6 (rename). - key_for_frame signature widened to UUID | int | str to accept the new str frame_id. - 4 C5 test files migrated to the new VioOutput shape with helper fixtures producing ImuBias + FeatureQuality + str frame_id. - c5_state/interface.py TYPE_CHECKING import path updated. Bootstrap healthcheck + test_types_importable updated to drop the deleted _types/vio module and pick up _types/inference (AZ-297) in the same sweep. Full unit-test sweep: 884 passed, 2 pre-existing environment skips (cmake, actionlint). Co-authored-by: Cursor <cursoragent@cursor.com>
512 lines
16 KiB
Python
512 lines
16 KiB
Python
"""AZ-386 — `EskfStateEstimator` mandatory simple-baseline.
|
|
|
|
Covers AC-1..AC-10 from ``_docs/02_tasks/todo/AZ-386_c5_eskf_baseline.md``.
|
|
|
|
The tests construct an estimator directly (no GTSAM required) and
|
|
exercise:
|
|
|
|
- AC-1: Protocol conformance (``isinstance`` against ``StateEstimator``).
|
|
- AC-2: Prediction step accuracy on a stationary synthetic IMU
|
|
sequence (drift < 1 m over 5 s of zero specific force).
|
|
- AC-3: ``add_vio`` updates state + shrinks covariance over
|
|
consistent measurements.
|
|
- AC-4: ``add_pose_anchor`` integrates BOTH ``marginals`` and
|
|
``jacobian`` modes (no skip — ESKF has no graph).
|
|
- AC-5: Every emitted ``covariance_6x6`` is SPD; injected non-SPD
|
|
raises ``EstimatorFatalError``.
|
|
- AC-6: ``smoothed_history`` returns ``smoothed=False`` entries.
|
|
- AC-7: ``BUILD_STATE_ESKF=OFF`` rejected by the factory.
|
|
- AC-8: ``SourceLabelStateMachine`` is auto-wired (label updates on
|
|
satellite anchor).
|
|
- AC-9: Filter divergence handling raises ``EstimatorFatalError``.
|
|
- AC-10: ``config.strategy="eskf"`` + ``BUILD_STATE_ESKF=ON``
|
|
resolves to ``EskfStateEstimator`` via the runtime root factory.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import dataclasses
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
from unittest import mock
|
|
from uuid import UUID, uuid4
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from gps_denied_onboard._types.fc import GpsHealth, GpsStatus
|
|
from gps_denied_onboard._types.geo import LatLonAlt
|
|
from gps_denied_onboard._types.nav import (
|
|
FeatureQuality,
|
|
ImuBias,
|
|
ImuSample,
|
|
ImuWindow,
|
|
VioOutput,
|
|
)
|
|
from gps_denied_onboard._types.pose import CovarianceMode, PoseEstimate, Quat
|
|
from gps_denied_onboard._types.state import IsamState, PoseSourceLabel
|
|
from gps_denied_onboard.components.c5_state import (
|
|
C5StateConfig,
|
|
EstimatorFatalError,
|
|
StateEstimator,
|
|
StateEstimatorConfigError,
|
|
)
|
|
from gps_denied_onboard.components.c5_state.eskf_baseline import (
|
|
EskfStateEstimator,
|
|
create,
|
|
register,
|
|
)
|
|
from gps_denied_onboard.config import load_config
|
|
from gps_denied_onboard.config.schema import Config
|
|
from gps_denied_onboard.helpers.wgs_converter import WgsConverter
|
|
from gps_denied_onboard.runtime_root.state_factory import (
|
|
build_state_estimator,
|
|
clear_state_ingest_binding,
|
|
clear_state_registry,
|
|
)
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Fixtures + helpers.
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _isolation():
|
|
clear_state_registry()
|
|
clear_state_ingest_binding()
|
|
yield
|
|
clear_state_registry()
|
|
clear_state_ingest_binding()
|
|
|
|
|
|
def _build_config(**state_overrides: Any) -> Config:
|
|
cfg = load_config(env={}, paths=(), require_env=False)
|
|
new_state = dataclasses.replace(C5StateConfig(strategy="eskf"), **state_overrides)
|
|
components = dict(cfg.components or {})
|
|
components["c5_state"] = new_state
|
|
return dataclasses.replace(cfg, components=components)
|
|
|
|
|
|
def _make_estimator(**kwargs: Any) -> EskfStateEstimator:
|
|
cfg = _build_config(**kwargs)
|
|
return EskfStateEstimator(
|
|
cfg,
|
|
imu_preintegrator=mock.MagicMock(),
|
|
se3_utils=mock.MagicMock(),
|
|
wgs_converter=mock.MagicMock(),
|
|
fdr_client=mock.MagicMock(),
|
|
)
|
|
|
|
|
|
def _identity_pose() -> np.ndarray:
|
|
return np.eye(4, dtype=np.float64)
|
|
|
|
|
|
def _pose_translated(x: float, y: float, z: float) -> np.ndarray:
|
|
p = np.eye(4, dtype=np.float64)
|
|
p[0, 3] = x
|
|
p[1, 3] = y
|
|
p[2, 3] = z
|
|
return p
|
|
|
|
|
|
_ESKF_ZERO_BIAS = ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0))
|
|
_ESKF_NEUTRAL_FQ = FeatureQuality(
|
|
tracked=20, new=2, lost=1, mean_parallax=5.0, mre_px=1.0,
|
|
)
|
|
|
|
|
|
def _vio(
|
|
frame_id: int,
|
|
ts_ns: int,
|
|
pose: np.ndarray,
|
|
cov: np.ndarray | None = None,
|
|
) -> VioOutput:
|
|
import gtsam
|
|
return VioOutput(
|
|
frame_id=str(frame_id),
|
|
relative_pose_T=gtsam.Pose3(pose),
|
|
pose_covariance_6x6=cov if cov is not None else np.eye(6) * 0.01,
|
|
imu_bias=_ESKF_ZERO_BIAS,
|
|
feature_quality=_ESKF_NEUTRAL_FQ,
|
|
emitted_at_ns=int(ts_ns),
|
|
)
|
|
|
|
|
|
_ENU_ORIGIN = LatLonAlt(lat_deg=0.0, lon_deg=0.0, alt_m=0.0)
|
|
|
|
|
|
def _rot_to_quat(R: np.ndarray) -> Quat:
|
|
"""3x3 rotation → unit quaternion (w,x,y,z); standard robust formula."""
|
|
trace = R[0, 0] + R[1, 1] + R[2, 2]
|
|
if trace > 0:
|
|
s = 0.5 / np.sqrt(trace + 1.0)
|
|
w = 0.25 / s
|
|
x = (R[2, 1] - R[1, 2]) * s
|
|
y = (R[0, 2] - R[2, 0]) * s
|
|
z = (R[1, 0] - R[0, 1]) * s
|
|
elif R[0, 0] > R[1, 1] and R[0, 0] > R[2, 2]:
|
|
s = 2.0 * np.sqrt(1.0 + R[0, 0] - R[1, 1] - R[2, 2])
|
|
w = (R[2, 1] - R[1, 2]) / s
|
|
x = 0.25 * s
|
|
y = (R[0, 1] + R[1, 0]) / s
|
|
z = (R[0, 2] + R[2, 0]) / s
|
|
elif R[1, 1] > R[2, 2]:
|
|
s = 2.0 * np.sqrt(1.0 + R[1, 1] - R[0, 0] - R[2, 2])
|
|
w = (R[0, 2] - R[2, 0]) / s
|
|
x = (R[0, 1] + R[1, 0]) / s
|
|
y = 0.25 * s
|
|
z = (R[1, 2] + R[2, 1]) / s
|
|
else:
|
|
s = 2.0 * np.sqrt(1.0 + R[2, 2] - R[0, 0] - R[1, 1])
|
|
w = (R[1, 0] - R[0, 1]) / s
|
|
x = (R[0, 2] + R[2, 0]) / s
|
|
y = (R[1, 2] + R[2, 1]) / s
|
|
z = 0.25 * s
|
|
return Quat(w=float(w), x=float(x), y=float(y), z=float(z))
|
|
|
|
|
|
def _pose_anchor(
|
|
frame_id: int,
|
|
ts_ns: int,
|
|
pose: np.ndarray,
|
|
cov: np.ndarray | None = None,
|
|
mode: CovarianceMode | str = CovarianceMode.MARGINALS,
|
|
) -> PoseEstimate:
|
|
rotation = pose[:3, :3]
|
|
translation = np.ascontiguousarray(pose[:3, 3], dtype=np.float64)
|
|
position = WgsConverter.local_enu_to_latlonalt(_ENU_ORIGIN, translation)
|
|
quat = _rot_to_quat(rotation)
|
|
mode_enum = mode if isinstance(mode, CovarianceMode) else CovarianceMode(mode.lower())
|
|
return PoseEstimate(
|
|
frame_id=UUID(int=frame_id),
|
|
position_wgs84=position,
|
|
orientation_world_T_body=quat,
|
|
covariance_6x6=cov if cov is not None else np.eye(6) * 0.01,
|
|
covariance_mode=mode_enum,
|
|
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
|
|
last_satellite_anchor_age_ms=0,
|
|
emitted_at=ts_ns,
|
|
)
|
|
|
|
|
|
def _imu_window(
|
|
*, ts_start_ns: int, dt_s: float, n_samples: int, accel: tuple, gyro: tuple
|
|
) -> ImuWindow:
|
|
samples = tuple(
|
|
ImuSample(
|
|
ts_ns=ts_start_ns + int(i * dt_s * 1_000_000_000),
|
|
accel_xyz=accel,
|
|
gyro_xyz=gyro,
|
|
)
|
|
for i in range(n_samples)
|
|
)
|
|
return ImuWindow(
|
|
samples=samples,
|
|
ts_start_ns=samples[0].ts_ns,
|
|
ts_end_ns=samples[-1].ts_ns,
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-1: Protocol conformance.
|
|
|
|
|
|
def test_ac1_eskf_satisfies_state_estimator_protocol() -> None:
|
|
estimator = _make_estimator()
|
|
|
|
assert isinstance(estimator, StateEstimator)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-2: Prediction step accuracy.
|
|
|
|
|
|
def test_ac2_prediction_drift_under_one_metre_over_five_seconds() -> None:
|
|
# Arrange — stationary FC: specific force = -gravity in body frame
|
|
# (so world-frame accel is 0). Zero gyro. 100 Hz over 5 s.
|
|
estimator = _make_estimator()
|
|
accel_body_stationary = (0.0, 0.0, 9.80665)
|
|
window = _imu_window(
|
|
ts_start_ns=1_000_000_000,
|
|
dt_s=0.01,
|
|
n_samples=501,
|
|
accel=accel_body_stationary,
|
|
gyro=(0.0, 0.0, 0.0),
|
|
)
|
|
|
|
# Act
|
|
estimator.add_fc_imu(window)
|
|
|
|
# Assert — under perfect zero specific force the nominal
|
|
# position must remain near the origin. Forward Euler accrues a
|
|
# small residual from quaternion-renormalisation noise.
|
|
drift = float(np.linalg.norm(estimator._nominal_pos))
|
|
assert drift < 1.0
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-3: Relative-pose update via add_vio.
|
|
|
|
|
|
def test_ac3_add_vio_updates_state_and_covariance_shrinks() -> None:
|
|
# Arrange — two consistent VIO frames; first frame seeds, second
|
|
# frame measures a 1 m forward delta.
|
|
estimator = _make_estimator()
|
|
cov_before_first = estimator._pose_covariance_6x6().copy()
|
|
estimator.add_vio(_vio(1, 1_000_000_000, _identity_pose()))
|
|
estimator.add_vio(_vio(2, 1_100_000_000, _pose_translated(1.0, 0.0, 0.0)))
|
|
|
|
# Assert
|
|
cov_after = estimator._pose_covariance_6x6()
|
|
assert np.linalg.norm(cov_after, ord="fro") <= np.linalg.norm(cov_before_first, ord="fro")
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-4: Absolute-pose update; mode doesn't skip ESKF.
|
|
|
|
|
|
def test_ac4_add_pose_anchor_runs_for_marginals_and_jacobian() -> None:
|
|
# Arrange — two anchors, one in each mode; both must shift the
|
|
# nominal position toward the measurement.
|
|
estimator_m = _make_estimator()
|
|
estimator_j = _make_estimator()
|
|
pose_m = _pose_anchor(1, 1_000_000_000, _pose_translated(5.0, 0.0, 0.0), mode="marginals")
|
|
pose_j = _pose_anchor(2, 1_000_000_000, _pose_translated(5.0, 0.0, 0.0), mode="jacobian")
|
|
|
|
# Act
|
|
estimator_m.add_pose_anchor(pose_m)
|
|
estimator_j.add_pose_anchor(pose_j)
|
|
|
|
# Assert — both modes move the nominal toward the measurement.
|
|
assert estimator_m._nominal_pos[0] > 0.1
|
|
assert estimator_j._nominal_pos[0] > 0.1
|
|
assert estimator_m._last_anchor_ns > 0
|
|
assert estimator_j._last_anchor_ns > 0
|
|
|
|
|
|
def test_ac4_pose_anchor_sets_last_anchor_ns() -> None:
|
|
estimator = _make_estimator()
|
|
pose = _pose_anchor(1, 1_000_000_000, _pose_translated(1.0, 0.0, 0.0))
|
|
|
|
estimator.add_pose_anchor(pose)
|
|
|
|
assert estimator._last_anchor_ns > 0
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-5: SPD covariance enforcement.
|
|
|
|
|
|
def test_ac5_current_estimate_emits_spd_covariance() -> None:
|
|
estimator = _make_estimator()
|
|
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
|
|
|
|
out = estimator.current_estimate()
|
|
|
|
assert out.covariance_6x6.shape == (6, 6)
|
|
np.linalg.cholesky(out.covariance_6x6)
|
|
|
|
|
|
def test_ac5_non_spd_covariance_raises_fatal() -> None:
|
|
# Arrange — force the internal covariance to be indefinite.
|
|
estimator = _make_estimator()
|
|
P = np.eye(16, dtype=np.float64)
|
|
P[0, 0] = -1.0 # negative variance on x-position
|
|
estimator._P = P
|
|
|
|
# Act / Assert
|
|
with pytest.raises(EstimatorFatalError, match="not SPD"):
|
|
estimator.current_estimate()
|
|
assert estimator._isam2_state == IsamState.LOST
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-6: smoothed_history honesty.
|
|
|
|
|
|
def test_ac6_smoothed_history_returns_smoothed_false_entries() -> None:
|
|
estimator = _make_estimator()
|
|
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
|
|
estimator.current_estimate()
|
|
estimator.add_pose_anchor(_pose_anchor(2, 1_100_000_000, _pose_translated(0.5, 0.0, 0.0)))
|
|
estimator.current_estimate()
|
|
|
|
history = estimator.smoothed_history(n_keyframes=10)
|
|
|
|
assert len(history) == 2
|
|
assert all(entry.smoothed is False for entry in history)
|
|
|
|
|
|
def test_ac6_smoothed_history_empty_buffer_returns_empty() -> None:
|
|
estimator = _make_estimator()
|
|
|
|
assert estimator.smoothed_history(10) == []
|
|
|
|
|
|
def test_ac6_smoothed_history_n_zero_returns_empty() -> None:
|
|
estimator = _make_estimator()
|
|
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
|
|
estimator.current_estimate()
|
|
|
|
assert estimator.smoothed_history(0) == []
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-7: BUILD_STATE_ESKF=OFF rejection.
|
|
|
|
|
|
def test_ac7_build_state_eskf_off_rejected(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
register()
|
|
monkeypatch.setenv("BUILD_STATE_ESKF", "OFF")
|
|
cfg = _build_config()
|
|
|
|
with pytest.raises(StateEstimatorConfigError, match="BUILD_STATE_ESKF is OFF"):
|
|
build_state_estimator(
|
|
cfg,
|
|
imu_preintegrator=mock.MagicMock(),
|
|
se3_utils=mock.MagicMock(),
|
|
wgs_converter=mock.MagicMock(),
|
|
fdr_client=mock.MagicMock(),
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-8: SourceLabelStateMachine integration.
|
|
|
|
|
|
def test_ac8_source_label_machine_is_auto_wired() -> None:
|
|
# Arrange — gate is open by default; the first satellite anchor
|
|
# transitions the label DEAD_RECKONED → SATELLITE_ANCHORED.
|
|
estimator = _make_estimator()
|
|
assert estimator.current_estimate().source_label == PoseSourceLabel.DEAD_RECKONED
|
|
|
|
# Act
|
|
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
|
|
out_after = estimator.current_estimate()
|
|
|
|
# Assert
|
|
assert out_after.source_label == PoseSourceLabel.SATELLITE_ANCHORED
|
|
|
|
|
|
def test_ac8_spoof_status_closes_gate_and_rejects_anchor() -> None:
|
|
# Arrange — SPOOFED GPS latches the gate closed; next anchor is
|
|
# a promotion attempt that fails (no consistency check possible
|
|
# because the estimator passes ``gps_consistency_delta_m=None``).
|
|
estimator = _make_estimator()
|
|
received: list[Any] = []
|
|
estimator.subscribe_spoof_rejection(
|
|
lambda reason, severity, text: received.append((reason, severity, text))
|
|
)
|
|
estimator.notify_gps_health(_gps_status(GpsStatus.SPOOFED), now_ns=0)
|
|
|
|
# Act
|
|
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
|
|
|
|
# Assert
|
|
assert len(received) == 1
|
|
assert estimator.current_estimate().source_label == PoseSourceLabel.VISUAL_PROPAGATED
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-9: Filter divergence handling.
|
|
|
|
|
|
def test_ac9_filter_divergence_raises_fatal() -> None:
|
|
# Arrange — feed a pose anchor whose measured position is wildly
|
|
# inconsistent with the nominal state, with a tight measurement
|
|
# covariance. innov_mag = 1000 m; sqrt(||R||_F) ≈ 1e-2 → ratio ≫ 10.
|
|
estimator = _make_estimator()
|
|
far_pose = _pose_translated(1000.0, 0.0, 0.0)
|
|
tight_cov = np.eye(6) * 1e-6
|
|
|
|
# Act / Assert
|
|
with pytest.raises(EstimatorFatalError, match="filter divergence"):
|
|
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, far_pose, cov=tight_cov))
|
|
assert estimator._isam2_state == IsamState.LOST
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-10: Composition wiring through the runtime root.
|
|
|
|
|
|
def test_ac10_factory_resolves_eskf_strategy(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
register()
|
|
monkeypatch.setenv("BUILD_STATE_ESKF", "ON")
|
|
cfg = _build_config(strategy="eskf")
|
|
|
|
estimator, handle = build_state_estimator(
|
|
cfg,
|
|
imu_preintegrator=mock.MagicMock(),
|
|
se3_utils=mock.MagicMock(),
|
|
wgs_converter=mock.MagicMock(),
|
|
fdr_client=mock.MagicMock(),
|
|
)
|
|
|
|
assert isinstance(estimator, EskfStateEstimator)
|
|
assert handle is None
|
|
|
|
|
|
def test_ac10_module_level_create_returns_protocol_and_none_handle() -> None:
|
|
cfg = _build_config()
|
|
|
|
estimator, handle = create(
|
|
config=cfg,
|
|
imu_preintegrator=mock.MagicMock(),
|
|
se3_utils=mock.MagicMock(),
|
|
wgs_converter=mock.MagicMock(),
|
|
fdr_client=mock.MagicMock(),
|
|
)
|
|
|
|
assert isinstance(estimator, StateEstimator)
|
|
assert handle is None
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Other robustness checks.
|
|
|
|
|
|
def test_health_snapshot_reports_init_then_tracking() -> None:
|
|
estimator = _make_estimator()
|
|
|
|
assert estimator.health_snapshot().isam2_state == IsamState.INIT
|
|
|
|
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
|
|
|
|
assert estimator.health_snapshot().isam2_state == IsamState.TRACKING
|
|
|
|
|
|
def test_current_estimate_returns_uuid_frame_id_and_increments_keyframes() -> None:
|
|
estimator = _make_estimator()
|
|
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
|
|
|
|
out = estimator.current_estimate()
|
|
|
|
assert isinstance(out.frame_id, type(uuid4()))
|
|
assert estimator.health_snapshot().keyframe_count == 1
|
|
|
|
|
|
def test_out_of_order_vio_raises_degraded() -> None:
|
|
from gps_denied_onboard.components.c5_state.errors import EstimatorDegradedError
|
|
|
|
estimator = _make_estimator()
|
|
estimator.add_vio(_vio(1, 2_000_000_000, _identity_pose()))
|
|
|
|
with pytest.raises(EstimatorDegradedError, match="out-of-order"):
|
|
estimator.add_vio(_vio(2, 1_000_000_000, _identity_pose()))
|
|
|
|
|
|
def test_pose_anchor_sets_velocity_to_zero_initially() -> None:
|
|
estimator = _make_estimator()
|
|
estimator.add_pose_anchor(_pose_anchor(1, 1_000_000_000, _identity_pose()))
|
|
|
|
out = estimator.current_estimate()
|
|
assert out.velocity_world_mps == (0.0, 0.0, 0.0)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Helpers for AC-8 GPS health construction.
|
|
|
|
|
|
def _gps_status(status: GpsStatus) -> GpsHealth:
|
|
return GpsHealth(status=status, fix_age_ms=0, captured_at=0)
|