Files
gps-denied-onboard/tests/unit/test_safety_anchor_wrapper.py
T
Oleksandr Bezdieniezhnykh 9fb9e4a349 [AZ-232] Add safety anchor state machine
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 19:10:10 +03:00

103 lines
3.4 KiB
Python

from safety_anchor_wrapper import SafetyAnchorStateMachine, SafetyStateConfig, TelemetryContext
from shared.contracts import AnchorDecision, VioStatePacket
def _telemetry() -> TelemetryContext:
return TelemetryContext(
timestamp_ns=1_000_000,
latitude_hint_deg=49.1,
longitude_hint_deg=36.1,
altitude_m=120.0,
)
def _vio_state(**overrides: object) -> VioStatePacket:
payload: dict[str, object] = {
"timestamp_ns": 1_000_000,
"relative_pose": {"x_m": 1.0, "y_m": 0.0, "z_m": 0.0},
"velocity_mps": (12.0, 0.0, 0.0),
"tracking_quality": 0.9,
"covariance_hint": [[1.8, 0.0], [0.0, 1.8]],
}
payload.update(overrides)
return VioStatePacket.model_validate(payload)
def _accepted_anchor() -> AnchorDecision:
return AnchorDecision(
candidate_id="chunk-1",
accepted=True,
estimated_pose={"latitude_deg": 49.2, "longitude_deg": 36.2, "altitude_m": 121.0},
inliers=48,
mean_reprojection_error_px=1.2,
)
def test_vio_state_updates_position_estimate_with_honest_covariance() -> None:
# Arrange
machine = SafetyAnchorStateMachine()
# Act
snapshot = machine.update_vio(_vio_state(), _telemetry())
# Assert
assert snapshot.estimate.source_label == "vo_extrapolated"
assert snapshot.estimate.latitude_deg == 49.1
assert snapshot.estimate.covariance_semimajor_m == 1.8
assert snapshot.estimate.horizontal_accuracy_m >= snapshot.estimate.covariance_semimajor_m
def test_accepted_anchor_corrects_state_and_records_evidence() -> None:
# Arrange
machine = SafetyAnchorStateMachine()
machine.update_vio(_vio_state(), _telemetry())
# Act
snapshot = machine.consider_anchor(_accepted_anchor())
# Assert
assert snapshot.mode == "satellite_anchored"
assert snapshot.estimate.latitude_deg == 49.2
assert snapshot.anchor_evidence is not None
assert snapshot.anchor_evidence.candidate_id == "chunk-1"
def test_blackout_degrades_then_reaches_no_fix_with_monotonic_covariance() -> None:
# Arrange
machine = SafetyAnchorStateMachine(
SafetyStateConfig(dead_reckoning_growth_m=250.0, no_fix_covariance_threshold_m=500.0)
)
machine.update_vio(_vio_state(covariance_hint=[[100.0]]), _telemetry())
# Act
degraded = machine.propagate_blackout(2_000_000)
no_fix = machine.propagate_blackout(3_000_000)
# Assert
assert degraded.mode == "dead_reckoned"
assert degraded.estimate.covariance_semimajor_m == 350.0
assert no_fix.mode == "no_fix"
assert no_fix.estimate.fix_type == 0
assert no_fix.estimate.covariance_semimajor_m > degraded.estimate.covariance_semimajor_m
def test_tile_write_eligibility_requires_trusted_low_covariance_pose() -> None:
# Arrange
machine = SafetyAnchorStateMachine(SafetyStateConfig(tile_write_covariance_max_m=3.0))
machine.update_vio(_vio_state(covariance_hint=[[4.0]]), _telemetry())
# Act
high_covariance = machine.tile_write_eligibility()
machine.consider_anchor(_accepted_anchor())
anchored = machine.tile_write_eligibility()
machine.propagate_blackout(2_000_000)
blackout = machine.tile_write_eligibility()
# Assert
assert high_covariance.eligible is False
assert high_covariance.reason == "covariance_too_high"
assert anchored.eligible is True
assert anchored.reason == "trusted_pose"
assert blackout.eligible is False
assert blackout.reason == "untrusted_source_label"