[AZ-490] C5 set_takeoff_origin entrypoint + bounded-delta GPS gate

Add operator warm-start path to C5 StateEstimator Protocol and both
implementations (GtsamIsam2StateEstimator, EskfStateEstimator), plus
the third clause of the AZ-385 spoof-promotion gate.

- StateEstimator Protocol: set_takeoff_origin(origin, sigma_horiz_m,
  sigma_vert_m) -> None.
- iSAM2: PriorFactorPose3 at origin with diagonal sigmas, single
  isam2.update().
- ESKF: zero _nominal_pos, overwrite _P position block with sigma**2.
- SourceLabelStateMachine.process_gps_sample bounded-delta clause:
  WgsConverter.horizontal_distance_m vs smoother estimate; reject
  resets the dwell-time counter so AZ-385 cannot re-promote off bad
  GPS.
- New EstimatorAlreadyStartedError (StateEstimatorConfigError
  subclass) on late call after first add_*.
- C5StateConfig: spoof_promotion_bounded_delta_m=200,
  default_takeoff_origin_sigma_horiz_m=5,
  default_takeoff_origin_sigma_vert_m=10.
- New GpsSample DTO + WgsConverter.horizontal_distance_m helper.
- 4 new FDR kinds (cold_start_origin.{set,unavailable},
  gps_bounded_delta.{accept,reject}) registered in AZ-272 schema.
- 33 new unit tests cover AC-1..AC-15; full repo 750 passed / 2
  skipped (pre-existing CI tooling skips).

Docs synced: protocol contract, C5 component description,
architecture, glossary, system-flows, C10 provisioning description.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-12 02:53:58 +03:00
parent 72a06edab0
commit 8a83166261
23 changed files with 1640 additions and 26 deletions
@@ -93,6 +93,14 @@ def _build_config(**state_overrides: Any) -> Config:
class _FakeEstimator:
"""Test fake satisfying every StateEstimator method (AC-1)."""
def set_takeoff_origin(
self,
origin: LatLonAlt,
sigma_horiz_m: float,
sigma_vert_m: float,
) -> None:
pass
def add_vio(self, vio: VioOutput) -> None:
pass
@@ -133,7 +141,7 @@ def test_ac1_missing_method_fails_isinstance() -> None:
def add_vio(self, vio: VioOutput) -> None:
pass
# Assert — missing 5 methods → not a StateEstimator
# Assert — missing 6 methods → not a StateEstimator
assert not isinstance(_Incomplete(), StateEstimator)
@@ -71,6 +71,7 @@ def _make_sm(
*,
min_stable_s: float = 10.0,
tol_m: float = 30.0,
bounded_delta_m: float = 200.0,
clock: _Clock | None = None,
fdr_client: mock.MagicMock | None = None,
) -> tuple[SourceLabelStateMachine, _Clock, mock.MagicMock]:
@@ -80,6 +81,7 @@ def _make_sm(
sm = SourceLabelStateMachine(
spoof_promotion_min_stable_s=min_stable_s,
spoof_promotion_visual_consistency_tol_m=tol_m,
spoof_promotion_bounded_delta_m=bounded_delta_m,
fdr_client=fdr,
producer_id="c5_state",
clock_ns=clock,
@@ -0,0 +1,647 @@
"""AZ-490 — ``set_takeoff_origin`` + bounded-delta GPS gate.
Covers AC-1..AC-15 from
``_docs/02_tasks/todo/AZ-490_c5_set_takeoff_origin.md``.
The tests construct estimators directly (the iSAM2 estimator
exercises the real GTSAM ``PriorFactorPose3`` insertion; the ESKF
estimator exercises the real NumPy covariance write). The
``SourceLabelStateMachine`` is exercised in isolation for the
bounded-delta clauses (AC-9..AC-11, AC-15) to keep the iSAM2
machinery out of the gate-only tests.
"""
from __future__ import annotations
import dataclasses
import math
from datetime import datetime, timezone
from typing import Any
from unittest import mock
from uuid import UUID
import numpy as np
import pytest
from gps_denied_onboard._types.fc import GpsSample
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.pose import (
CovarianceMode,
PoseEstimate,
Quat,
)
from gps_denied_onboard._types.state import PoseSourceLabel
from gps_denied_onboard._types.vio import VioOutput
from gps_denied_onboard.components.c5_state import (
C5StateConfig,
EstimatorAlreadyStartedError,
StateEstimator,
StateEstimatorConfigError,
)
from gps_denied_onboard.components.c5_state._source_label_sm import (
BOUNDED_DELTA_REJECT,
BOUNDED_DELTA_SOFT,
SourceLabelStateMachine,
)
from gps_denied_onboard.components.c5_state.eskf_baseline import EskfStateEstimator
from gps_denied_onboard.components.c5_state.gtsam_isam2_estimator import (
GtsamIsam2StateEstimator,
create as create_isam2,
)
from gps_denied_onboard.fdr_client.records import FdrRecord, parse, serialise
from gps_denied_onboard.helpers.wgs_converter import WgsConverter
from gps_denied_onboard.runtime_root.state_factory import clear_state_registry
# ---------------------------------------------------------------------
# Fixtures + builders.
@pytest.fixture(autouse=True)
def _registry_isolation() -> Any:
clear_state_registry()
yield
clear_state_registry()
def _build_isam2(**overrides: Any) -> GtsamIsam2StateEstimator:
block = C5StateConfig(strategy="gtsam_isam2", keyframe_window_size=15, **overrides)
cfg = mock.MagicMock()
cfg.components = {"c5_state": block}
estimator, _ = create_isam2(
config=cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
return estimator
def _build_eskf(**overrides: Any) -> EskfStateEstimator:
block = C5StateConfig(strategy="eskf", keyframe_window_size=15, **overrides)
cfg = mock.MagicMock()
cfg.components = {"c5_state": block}
return EskfStateEstimator(
cfg,
imu_preintegrator=mock.MagicMock(),
se3_utils=mock.MagicMock(),
wgs_converter=mock.MagicMock(),
fdr_client=mock.MagicMock(),
)
def _origin() -> LatLonAlt:
return LatLonAlt(lat_deg=50.0, lon_deg=36.2, alt_m=200.0)
def _vio(*, frame_id: int, t_seconds: float) -> VioOutput:
return VioOutput(
frame_id=frame_id,
timestamp=datetime.fromtimestamp(t_seconds, tz=timezone.utc),
pose_se3=np.eye(4),
covariance_6x6=np.eye(6) * 0.01,
)
def _pose_anchor(*, frame_id: int, t_seconds: float) -> PoseEstimate:
return PoseEstimate(
frame_id=UUID(int=frame_id),
position_wgs84=_origin(),
orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0),
covariance_6x6=np.eye(6) * 0.01,
covariance_mode=CovarianceMode.MARGINALS,
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
last_satellite_anchor_age_ms=0,
emitted_at=int(t_seconds * 1_000_000_000),
)
def _cold_start_records(estimator: GtsamIsam2StateEstimator | EskfStateEstimator) -> list[FdrRecord]:
enqueued = estimator._fdr_client.enqueue.call_args_list # type: ignore[union-attr]
out = []
for call in enqueued:
args, _ = call
if not args:
continue
record = args[0]
if isinstance(record, FdrRecord) and record.kind == "c5.cold_start_origin.set":
out.append(record)
return out
def _bounded_delta_records(
fdr: mock.MagicMock, *, kind: str
) -> list[FdrRecord]:
out = []
for call in fdr.enqueue.call_args_list:
args, _ = call
if args and isinstance(args[0], FdrRecord) and args[0].kind == kind:
out.append(args[0])
return out
class _Clock:
"""Mutable monotonic clock used by SourceLabelStateMachine in tests."""
def __init__(self, t0: int = 0) -> None:
self.t = t0
def __call__(self) -> int:
return self.t
def _make_sm(
*,
bounded_delta_m: float = 200.0,
fdr_client: mock.MagicMock | None = None,
clock: _Clock | None = None,
) -> tuple[SourceLabelStateMachine, mock.MagicMock]:
fdr = fdr_client if fdr_client is not None else mock.MagicMock()
sm = SourceLabelStateMachine(
spoof_promotion_min_stable_s=10.0,
spoof_promotion_visual_consistency_tol_m=30.0,
spoof_promotion_bounded_delta_m=bounded_delta_m,
fdr_client=fdr,
producer_id="c5_state",
clock_ns=clock if clock is not None else _Clock(0),
)
return sm, fdr
# ---------------------------------------------------------------------
# AC-1: Protocol conformance — both impls expose set_takeoff_origin.
def test_ac1_isam2_is_state_estimator() -> None:
estimator = _build_isam2()
assert isinstance(estimator, StateEstimator)
assert callable(estimator.set_takeoff_origin)
def test_ac1_eskf_is_state_estimator() -> None:
estimator = _build_eskf()
assert isinstance(estimator, StateEstimator)
assert callable(estimator.set_takeoff_origin)
# ---------------------------------------------------------------------
# AC-2: iSAM2 happy path — origin seeds the smoother prior.
def test_ac2_isam2_set_takeoff_origin_seeds_prior_and_emits_fdr() -> None:
# Arrange
estimator = _build_isam2()
# Act
estimator.set_takeoff_origin(_origin(), sigma_horiz_m=5.0, sigma_vert_m=10.0)
# Assert — exactly one PriorFactorPose3 was committed
import gtsam
factors = estimator._isam2.getFactorsUnsafe()
prior_factors = [
factors.at(i)
for i in range(factors.size())
if isinstance(factors.at(i), gtsam.PriorFactorPose3)
]
assert len(prior_factors) == 1
# Pose is at Identity (origin == ENU(0,0,0))
pose = prior_factors[0].prior()
assert np.allclose(np.asarray(pose.translation()), np.zeros(3), atol=1e-9)
# Assert — exactly one cold-start FDR record with source="manifest"
records = _cold_start_records(estimator)
assert len(records) == 1
assert records[0].payload["source"] == "manifest"
assert records[0].payload["lat_deg"] == _origin().lat_deg
assert records[0].payload["sigma_horiz_m"] == 5.0
assert records[0].payload["sigma_vert_m"] == 10.0
# Assert — ENU origin tracks the operator origin
assert estimator._enu_origin == _origin()
# ---------------------------------------------------------------------
# AC-3: ESKF happy path — nominal state + P block seeded.
def test_ac3_eskf_set_takeoff_origin_seeds_state_and_p() -> None:
# Arrange
estimator = _build_eskf()
# Act
estimator.set_takeoff_origin(_origin(), sigma_horiz_m=5.0, sigma_vert_m=10.0)
# Assert — nominal position is zero (origin IS the ENU(0,0,0) anchor)
assert np.allclose(estimator._nominal_pos, np.zeros(3), atol=1e-12)
# Assert — position block of P matches diag(25, 25, 100)
expected = np.diag([25.0, 25.0, 100.0])
assert np.allclose(estimator._P[0:3, 0:3], expected, atol=1e-12)
# Assert — ENU origin tracks the operator origin
assert estimator._enu_origin == _origin()
# Assert — exactly one FDR record
records = _cold_start_records(estimator)
assert len(records) == 1
assert records[0].payload["source"] == "manifest"
# ---------------------------------------------------------------------
# AC-4: Idempotent — calling twice with identical args is a no-op.
@pytest.mark.parametrize("builder", [_build_isam2, _build_eskf])
def test_ac4_idempotent_double_call_is_noop(builder: Any) -> None:
# Arrange
estimator = builder()
estimator.set_takeoff_origin(_origin(), sigma_horiz_m=5.0, sigma_vert_m=10.0)
fdr_calls_before = len(estimator._fdr_client.enqueue.call_args_list)
# Act — second identical call
estimator.set_takeoff_origin(_origin(), sigma_horiz_m=5.0, sigma_vert_m=10.0)
# Assert — no extra FDR record, no exception
fdr_calls_after = len(estimator._fdr_client.enqueue.call_args_list)
assert fdr_calls_after == fdr_calls_before
# ---------------------------------------------------------------------
# AC-5: Conflict — calling twice with different args raises.
@pytest.mark.parametrize("builder", [_build_isam2, _build_eskf])
def test_ac5_conflict_double_call_raises(builder: Any) -> None:
# Arrange
estimator = builder()
estimator.set_takeoff_origin(_origin(), sigma_horiz_m=5.0, sigma_vert_m=10.0)
# Act + Assert — different origin
different = LatLonAlt(lat_deg=51.0, lon_deg=36.2, alt_m=200.0)
with pytest.raises(StateEstimatorConfigError) as excinfo:
estimator.set_takeoff_origin(different, sigma_horiz_m=5.0, sigma_vert_m=10.0)
msg = str(excinfo.value)
# Both old and new origins named in the message
assert "previous=" in msg and "new=" in msg
# ---------------------------------------------------------------------
# AC-6: Late call — set_takeoff_origin after first add_vio raises.
def test_ac6_isam2_late_call_raises_already_started() -> None:
# Arrange
estimator = _build_isam2()
estimator._isam2_handle = mock.MagicMock() # stub the handle for add_vio
# Act — first add_vio closes the cold-start window
estimator.add_vio(_vio(frame_id=1, t_seconds=1.0))
# Assert
with pytest.raises(EstimatorAlreadyStartedError):
estimator.set_takeoff_origin(_origin(), sigma_horiz_m=5.0, sigma_vert_m=10.0)
def test_ac6_eskf_late_call_raises_already_started() -> None:
# Arrange
estimator = _build_eskf()
estimator.add_vio(_vio(frame_id=1, t_seconds=1.0))
# Assert
with pytest.raises(EstimatorAlreadyStartedError):
estimator.set_takeoff_origin(_origin(), sigma_horiz_m=5.0, sigma_vert_m=10.0)
# ---------------------------------------------------------------------
# AC-7: Out-of-bounds latitude raises StateEstimatorConfigError.
@pytest.mark.parametrize("builder", [_build_isam2, _build_eskf])
def test_ac7_invalid_latlonalt_raises(builder: Any) -> None:
# Arrange
estimator = builder()
bad_origin = LatLonAlt(lat_deg=95.0, lon_deg=36.0, alt_m=200.0)
# Act + Assert
with pytest.raises(StateEstimatorConfigError, match=r"latitude 95\.0 outside"):
estimator.set_takeoff_origin(bad_origin, sigma_horiz_m=5.0, sigma_vert_m=10.0)
@pytest.mark.parametrize("builder", [_build_isam2, _build_eskf])
def test_ac7_invalid_longitude_raises(builder: Any) -> None:
estimator = builder()
bad = LatLonAlt(lat_deg=50.0, lon_deg=200.0, alt_m=200.0)
with pytest.raises(StateEstimatorConfigError, match=r"longitude 200\.0 outside"):
estimator.set_takeoff_origin(bad, sigma_horiz_m=5.0, sigma_vert_m=10.0)
@pytest.mark.parametrize("builder", [_build_isam2, _build_eskf])
def test_ac7_non_finite_lat_raises(builder: Any) -> None:
estimator = builder()
bad = LatLonAlt(lat_deg=float("nan"), lon_deg=36.0, alt_m=200.0)
with pytest.raises(StateEstimatorConfigError, match="non-finite"):
estimator.set_takeoff_origin(bad, sigma_horiz_m=5.0, sigma_vert_m=10.0)
# ---------------------------------------------------------------------
# AC-8: Negative sigma raises.
@pytest.mark.parametrize("builder", [_build_isam2, _build_eskf])
def test_ac8_negative_horiz_sigma_raises(builder: Any) -> None:
estimator = builder()
with pytest.raises(StateEstimatorConfigError, match="sigma_horiz_m must be positive"):
estimator.set_takeoff_origin(_origin(), sigma_horiz_m=-5.0, sigma_vert_m=10.0)
@pytest.mark.parametrize("builder", [_build_isam2, _build_eskf])
def test_ac8_zero_vert_sigma_raises(builder: Any) -> None:
estimator = builder()
with pytest.raises(StateEstimatorConfigError, match="sigma_vert_m must be positive"):
estimator.set_takeoff_origin(_origin(), sigma_horiz_m=5.0, sigma_vert_m=0.0)
# ---------------------------------------------------------------------
# AC-9: Bounded-delta accept — sample within ring is admitted.
def test_ac9_bounded_delta_accept_emits_soft_label() -> None:
# Arrange
sm, fdr = _make_sm(bounded_delta_m=200.0)
smoother_estimate = LatLonAlt(lat_deg=50.0, lon_deg=36.2, alt_m=200.0)
sample = LatLonAlt(lat_deg=50.0008, lon_deg=36.2008, alt_m=200.0)
# Sanity — distance is well under the ring (≈ 100 m at 50° N).
distance = WgsConverter.horizontal_distance_m(smoother_estimate, sample)
assert distance < 200.0
# Act
result = sm.process_gps_sample(sample, smoother_estimate=smoother_estimate)
# Assert
assert result == BOUNDED_DELTA_SOFT
accepts = _bounded_delta_records(fdr, kind="c5.gps_bounded_delta.accept")
assert len(accepts) == 1
assert accepts[0].payload["distance_m"] == pytest.approx(distance, rel=1e-9)
assert accepts[0].payload["threshold_m"] == 200.0
# ---------------------------------------------------------------------
# AC-10: Bounded-delta reject — sample outside ring is dropped.
def test_ac10_bounded_delta_reject_emits_record_and_resets_dwell() -> None:
# Arrange
sm, fdr = _make_sm(bounded_delta_m=200.0)
smoother_estimate = LatLonAlt(lat_deg=50.0, lon_deg=36.2, alt_m=200.0)
sample = LatLonAlt(lat_deg=50.005, lon_deg=36.205, alt_m=200.0)
distance = WgsConverter.horizontal_distance_m(smoother_estimate, sample)
assert distance > 200.0
# Seed a STABLE_NON_SPOOFED dwell so we can observe its reset
from gps_denied_onboard._types.fc import GpsHealth, GpsStatus
sm.notify_gps_health(
GpsHealth(status=GpsStatus.STABLE_NON_SPOOFED, fix_age_ms=10, captured_at=0)
)
assert sm._gps_health_stable_since_ns is not None # arrange precondition
# Act
result = sm.process_gps_sample(sample, smoother_estimate=smoother_estimate)
# Assert
assert result == BOUNDED_DELTA_REJECT
rejects = _bounded_delta_records(fdr, kind="c5.gps_bounded_delta.reject")
assert len(rejects) == 1
rec = rejects[0].payload
assert rec["sample_lat"] == sample.lat_deg
assert rec["smoother_lat"] == smoother_estimate.lat_deg
assert rec["distance_m"] == pytest.approx(distance, rel=1e-9)
# Dwell-time clause was reset by the reject
assert sm._gps_health_stable_since_ns is None
# ---------------------------------------------------------------------
# AC-11: Threshold is config-driven — relaxing it admits AC-10's sample.
def test_ac11_threshold_relaxed_admits_previously_rejected_sample() -> None:
sm, _fdr = _make_sm(bounded_delta_m=1000.0)
smoother_estimate = LatLonAlt(lat_deg=50.0, lon_deg=36.2, alt_m=200.0)
sample = LatLonAlt(lat_deg=50.005, lon_deg=36.205, alt_m=200.0)
result = sm.process_gps_sample(sample, smoother_estimate=smoother_estimate)
assert result == BOUNDED_DELTA_SOFT
# ---------------------------------------------------------------------
# AC-12: FDR record kinds round-trip through the schema.
@pytest.mark.parametrize(
("kind", "payload"),
[
(
"c5.cold_start_origin.set",
{
"source": "manifest",
"lat_deg": 50.0,
"lon_deg": 36.2,
"alt_m": 200.0,
"sigma_horiz_m": 5.0,
"sigma_vert_m": 10.0,
},
),
(
"c5.cold_start_origin.unavailable",
{"reason": "no_manifest_origin_no_fc_ekf"},
),
(
"c5.gps_bounded_delta.accept",
{
"sample_lat": 50.0,
"sample_lon": 36.2,
"smoother_lat": 50.0001,
"smoother_lon": 36.2,
"distance_m": 11.1,
"threshold_m": 200.0,
},
),
(
"c5.gps_bounded_delta.reject",
{
"sample_lat": 50.0,
"sample_lon": 36.2,
"smoother_lat": 50.005,
"smoother_lon": 36.205,
"distance_m": 700.0,
"threshold_m": 200.0,
},
),
],
)
def test_ac12_fdr_kinds_round_trip(kind: str, payload: dict[str, Any]) -> None:
record = FdrRecord(
schema_version=1,
ts=datetime.now(tz=timezone.utc).isoformat(),
producer_id="c5_state",
kind=kind,
payload=payload,
)
encoded = serialise(record)
decoded = parse(encoded)
assert decoded.kind == kind
assert decoded.payload == payload
# No unknown-key bucket for the AZ-490 shape (every key is registered)
assert "extra" not in decoded.payload
# ---------------------------------------------------------------------
# AC-13: No origin → FC-EKF cold-start path emits one fc_ekf record.
def test_ac13_isam2_no_origin_emits_fc_ekf_record_on_first_add() -> None:
estimator = _build_isam2()
estimator._isam2_handle = mock.MagicMock()
estimator.add_vio(_vio(frame_id=1, t_seconds=1.0))
estimator.add_vio(_vio(frame_id=2, t_seconds=2.0))
records = _cold_start_records(estimator)
assert len(records) == 1
assert records[0].payload["source"] == "fc_ekf"
def test_ac13_eskf_no_origin_emits_fc_ekf_record_on_first_add() -> None:
estimator = _build_eskf()
estimator.add_vio(_vio(frame_id=1, t_seconds=1.0))
estimator.add_vio(_vio(frame_id=2, t_seconds=2.0))
records = _cold_start_records(estimator)
assert len(records) == 1
assert records[0].payload["source"] == "fc_ekf"
# ---------------------------------------------------------------------
# AC-14: AZ-385's first two clauses are unchanged — bounded-delta is additive.
#
# We re-run the spoof-gate engagement / gate-lift path WITHOUT touching
# the bounded-delta method and assert the canonical gate behaviour
# still holds. The full AZ-385 acceptance suite re-runs in
# test_az385_source_label_spoof_gate.py — this test is the smoke
# check that the new clause did not perturb the existing wiring.
def test_ac14_existing_spoof_gate_unchanged_by_new_clause() -> None:
from gps_denied_onboard._types.fc import GpsHealth, GpsStatus
clock = _Clock(0)
sm, _ = _make_sm(bounded_delta_m=200.0, clock=clock)
# Initial — DEAD_RECKONED until first anchor
assert sm.current_label() == PoseSourceLabel.DEAD_RECKONED
# Anchor without a spoof event → SATELLITE_ANCHORED
sm.notify_gps_health(
GpsHealth(status=GpsStatus.STABLE_NON_SPOOFED, fix_age_ms=10, captured_at=0)
)
clock.t = 1_000_000_000 # 1 s in
sm.notify_satellite_anchor(now_ns=clock.t, gps_consistency_delta_m=1.0)
assert sm.current_label() == PoseSourceLabel.SATELLITE_ANCHORED
# Spoof event → gate latches closed → VISUAL_PROPAGATED
sm.notify_gps_health(
GpsHealth(status=GpsStatus.SPOOFED, fix_age_ms=10, captured_at=0)
)
assert sm.is_spoof_promotion_blocked() is True
assert sm.current_label() == PoseSourceLabel.VISUAL_PROPAGATED
# ---------------------------------------------------------------------
# AC-15: Distance is computed via WgsConverter (geodetic), not haversine
# on the equirectangular projection.
def test_ac15_geodetic_distance_at_60n_within_half_meter() -> None:
"""AC-15 — distance computed via WgsConverter agrees with the WGS-84
geodesic (pyproj.Geod inverse) within 0.5 m at 60° N for a ~200 m
east-west offset.
The ground truth is the WGS-84 ellipsoid Vincenty distance from
pyproj's Geod, NOT a haversine-on-equirectangular shortcut — the
AC explicitly excludes the latter shortcut.
"""
from pyproj import Geod
origin = LatLonAlt(lat_deg=60.0, lon_deg=10.0, alt_m=0.0)
# Construct a sample that is approximately 200 m due east using the
# spherical estimate; the test only requires agreement with the
# ellipsoid Vincenty distance, NOT 200 m exactly.
earth_radius_m = 6_378_137.0
metres_per_degree_lon = (
math.pi * earth_radius_m * math.cos(math.radians(60.0)) / 180.0
)
sample = LatLonAlt(
lat_deg=60.0,
lon_deg=10.0 + 200.0 / metres_per_degree_lon,
alt_m=0.0,
)
geod = Geod(ellps="WGS84")
_az_fwd, _az_back, ground_truth_m = geod.inv(
origin.lon_deg, origin.lat_deg, sample.lon_deg, sample.lat_deg
)
distance = WgsConverter.horizontal_distance_m(origin, sample)
assert abs(distance - ground_truth_m) < 0.5
# Symmetry — Principle of geodesic invariance.
distance_reverse = WgsConverter.horizontal_distance_m(sample, origin)
assert abs(distance - distance_reverse) < 1e-6
# ---------------------------------------------------------------------
# Bounded-delta gate ignores no-smoother case.
def test_bounded_delta_no_smoother_returns_none() -> None:
sm, fdr = _make_sm(bounded_delta_m=200.0)
sample = LatLonAlt(lat_deg=50.0, lon_deg=36.0, alt_m=200.0)
result = sm.process_gps_sample(sample, smoother_estimate=None)
assert result is None
# No FDR record either way
assert fdr.enqueue.call_count == 0
# ---------------------------------------------------------------------
# Estimator.notify_gps_sample — happy-path delegation.
def test_eskf_notify_gps_sample_delegates_to_state_machine() -> None:
# Arrange — seed an operator origin so the smoother latlon is real
estimator = _build_eskf()
estimator.set_takeoff_origin(_origin(), sigma_horiz_m=5.0, sigma_vert_m=10.0)
# Sample within the 200 m ring of the origin
sample = GpsSample(
position_wgs84=LatLonAlt(
lat_deg=_origin().lat_deg + 0.0008,
lon_deg=_origin().lon_deg + 0.0008,
alt_m=200.0,
),
captured_at=1_000_000_000,
)
# Act
result = estimator.notify_gps_sample(sample)
# Assert
assert result == BOUNDED_DELTA_SOFT
accepts = _bounded_delta_records(
estimator._fdr_client, kind="c5.gps_bounded_delta.accept"
)
assert len(accepts) == 1
@@ -93,6 +93,36 @@ def _kind_payload(kind: str) -> dict[str, object]:
"rollover_count": 0,
"clean_shutdown": True,
}
# AZ-490 / E-C5: operator-origin cold-start ladder + bounded-delta GPS gate.
if kind == "c5.cold_start_origin.set":
return {
"source": "manifest",
"lat_deg": 50.0,
"lon_deg": 36.2,
"alt_m": 200.0,
"sigma_horiz_m": 5.0,
"sigma_vert_m": 10.0,
}
if kind == "c5.cold_start_origin.unavailable":
return {"reason": "no_manifest_origin_no_fc_ekf"}
if kind == "c5.gps_bounded_delta.accept":
return {
"sample_lat": 50.0,
"sample_lon": 36.2,
"smoother_lat": 50.0001,
"smoother_lon": 36.2,
"distance_m": 11.1,
"threshold_m": 200.0,
}
if kind == "c5.gps_bounded_delta.reject":
return {
"sample_lat": 50.0,
"sample_lon": 36.2,
"smoother_lat": 50.005,
"smoother_lon": 36.205,
"distance_m": 700.0,
"threshold_m": 200.0,
}
raise AssertionError(f"unhandled kind in fixture: {kind!r}")