mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 21:46:36 +00:00
78dcf7b4e7
Phase A — Runtime bugs: - SSE: add push_event() method to SSEEventStreamer (was missing, masked by mocks) - MAVLink: satellites_visible=10 (was 0, triggers ArduPilot failsafe) - MAVLink: horiz_accuracy=sqrt(P[0,0]+P[1,1]) per spec (was sqrt(avg)) - MAVLink: MEDIUM confidence → fix_type=3 per solution.md (was 2) Phase B — Functional gaps: - handle_user_fix() injects operator GPS into ESKF with noise=500m - app.py uses create_vo_backend() factory (was hardcoded SequentialVO) - ESKF: Mahalanobis gating on satellite updates (rejects outliers >5σ) - ESKF: public accessors (position, quaternion, covariance, last_timestamp) - Processor: no more private ESKF field access Phase C — Documentation: - README: correct API endpoints, CLI command, 40+ env vars documented - Dockerfile: ENV prefixes match pydantic-settings (DB_, SATELLITE_, MAVLINK_) - tech_stack.md marked ARCHIVED (contradicts solution.md) Phase D — Hardening: - JWT auth middleware (AUTH_ENABLED=false default, verify_token on /flights) - TLS config env vars (AUTH_SSL_CERTFILE, AUTH_SSL_KEYFILE) - SHA-256 tile manifest verification in SatelliteDataManager - AuthConfig, ESKFSettings, MAVLinkConfig, SatelliteConfig in config.py Also: conftest.py shared fixtures, download_tiles.py, convert_to_trt.py scripts, config wiring into app.py lifespan, config-driven ESKF, calculate_precise_angle fix. Tests: 196 passed / 8 skipped. Ruff clean. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
319 lines
11 KiB
Python
319 lines
11 KiB
Python
"""Tests for ESKF (F17) — Error-State Kalman Filter."""
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from gps_denied.core.coordinates import CoordinateTransformer
|
|
from gps_denied.core.eskf import ESKF
|
|
from gps_denied.schemas import GPSPoint
|
|
from gps_denied.schemas.eskf import ConfidenceTier, ESKFConfig, IMUMeasurement
|
|
|
|
|
|
@pytest.fixture
|
|
def eskf():
|
|
"""ESKF initialized at origin with default config."""
|
|
e = ESKF()
|
|
e.initialize(np.zeros(3), timestamp=0.0)
|
|
return e
|
|
|
|
|
|
@pytest.fixture
|
|
def config():
|
|
return ESKFConfig()
|
|
|
|
|
|
class TestESKFInitialization:
|
|
"""Tests for ESKF-04: Initialization."""
|
|
|
|
def test_initialization_default(self, config):
|
|
"""Test basic initialization with position and timestamp."""
|
|
e = ESKF()
|
|
e.initialize(np.array([100.0, 200.0, 0.0]), timestamp=0.0)
|
|
assert e.initialized
|
|
state = e.get_state()
|
|
assert np.allclose(state.position, [100.0, 200.0, 0.0])
|
|
assert np.allclose(state.velocity, [0.0, 0.0, 0.0])
|
|
assert np.allclose(state.quaternion, [1.0, 0.0, 0.0, 0.0])
|
|
assert state.covariance.shape == (15, 15)
|
|
assert state.covariance[0, 0] == config.init_pos_var
|
|
assert state.covariance[3, 3] == config.init_vel_var
|
|
assert state.covariance[6, 6] == config.init_att_var
|
|
|
|
def test_initialization_from_gps(self):
|
|
"""Test initialization from GPS position."""
|
|
ct = CoordinateTransformer()
|
|
ct.set_enu_origin("f1", GPSPoint(lat=48.0, lon=37.0))
|
|
|
|
e = ESKF()
|
|
e.initialize_from_gps(
|
|
GPSPoint(lat=48.001, lon=37.001),
|
|
altitude=600.0,
|
|
timestamp=0.0,
|
|
coord_transformer=ct,
|
|
flight_id="f1",
|
|
)
|
|
assert e.initialized
|
|
state = e.get_state()
|
|
assert not np.allclose(state.position, [0.0, 0.0, 0.0])
|
|
assert np.linalg.norm(state.position[:2]) > 50 # ~111m per 0.001 deg
|
|
|
|
|
|
class TestESKFPrediction:
|
|
"""Tests for ESKF-01: IMU prediction."""
|
|
|
|
def test_predict_covariance_grows(self, eskf):
|
|
"""Test that covariance grows during IMU-only prediction."""
|
|
P_before = eskf.get_state().covariance.copy()
|
|
trace_before = np.trace(P_before[0:3, 0:3])
|
|
|
|
imu = IMUMeasurement(
|
|
accel=np.array([0.0, 0.0, 9.81]),
|
|
gyro=np.zeros(3),
|
|
timestamp=0.01,
|
|
)
|
|
eskf.predict(imu)
|
|
|
|
P_after = eskf.get_state().covariance
|
|
trace_after = np.trace(P_after[0:3, 0:3])
|
|
assert trace_after > trace_before
|
|
|
|
def test_predict_gravity_compensation(self, eskf):
|
|
"""Test that gravity is properly compensated (stationary accel has no velocity growth)."""
|
|
# 100 steps at 0.01s = 1 second with gravity-aligned acceleration
|
|
for i in range(100):
|
|
imu = IMUMeasurement(
|
|
accel=np.array([0.0, 0.0, 9.81]),
|
|
gyro=np.zeros(3),
|
|
timestamp=0.01 * (i + 1),
|
|
)
|
|
eskf.predict(imu)
|
|
|
|
state = eskf.get_state()
|
|
# Velocity should remain near zero (gravity compensated)
|
|
assert np.linalg.norm(state.velocity) < 1.0
|
|
|
|
def test_predict_with_acceleration(self, eskf):
|
|
"""Test velocity integration from applied acceleration."""
|
|
# 100 steps at 0.01s = 1 second with 1 m/s^2 forward + gravity
|
|
for i in range(100):
|
|
imu = IMUMeasurement(
|
|
accel=np.array([1.0, 0.0, 9.81]),
|
|
gyro=np.zeros(3),
|
|
timestamp=0.01 * (i + 1),
|
|
)
|
|
eskf.predict(imu)
|
|
|
|
state = eskf.get_state()
|
|
# Velocity[0] should be approximately 1.0 m/s
|
|
assert abs(state.velocity[0] - 1.0) < 0.5
|
|
|
|
def test_predict_position_propagation(self):
|
|
"""Test position propagation from velocity."""
|
|
e = ESKF()
|
|
e.initialize(np.zeros(3), timestamp=0.0, velocity=np.array([10.0, 0.0, 0.0]))
|
|
|
|
for i in range(100):
|
|
imu = IMUMeasurement(
|
|
accel=np.array([0.0, 0.0, 9.81]),
|
|
gyro=np.zeros(3),
|
|
timestamp=0.01 * (i + 1),
|
|
)
|
|
e.predict(imu)
|
|
|
|
state = e.get_state()
|
|
# Position[0] should be approximately 10.0 meters after 1 second at 10 m/s
|
|
assert abs(state.position[0] - 10.0) < 2.0
|
|
|
|
|
|
class TestESKFVOUpdate:
|
|
"""Tests for ESKF-02: VO measurement update."""
|
|
|
|
def test_vo_update_reduces_uncertainty(self, eskf):
|
|
"""Test that VO update reduces position covariance."""
|
|
# Grow covariance first
|
|
for _ in range(5):
|
|
imu = IMUMeasurement(
|
|
accel=np.zeros(3),
|
|
gyro=np.zeros(3),
|
|
timestamp=eskf._last_timestamp + 0.01,
|
|
)
|
|
eskf.predict(imu)
|
|
|
|
trace_before = np.trace(eskf.get_state().covariance[0:3, 0:3])
|
|
|
|
eskf.update_vo(np.zeros(3), dt_vo=0.1)
|
|
|
|
trace_after = np.trace(eskf.get_state().covariance[0:3, 0:3])
|
|
assert trace_after < trace_before
|
|
|
|
def test_vo_update_corrects_position(self, eskf):
|
|
"""Test that VO update shifts position toward measurement."""
|
|
# Predict to drift
|
|
for i in range(10):
|
|
imu = IMUMeasurement(
|
|
accel=np.array([0.0, 0.0, 9.81]),
|
|
gyro=np.zeros(3),
|
|
timestamp=0.01 * (i + 1),
|
|
)
|
|
eskf.predict(imu)
|
|
|
|
pos_before = eskf.get_state().position.copy()
|
|
|
|
# VO measurement with 1m north offset
|
|
eskf.update_vo(np.array([0.0, 1.0, 0.0]), dt_vo=0.1)
|
|
|
|
pos_after = eskf.get_state().position
|
|
assert not np.allclose(pos_after, pos_before)
|
|
|
|
def test_vo_update_returns_innovation(self, eskf):
|
|
"""Test that VO update returns innovation vector."""
|
|
innovation = eskf.update_vo(np.array([1.0, 0.0, 0.0]), dt_vo=0.1)
|
|
assert innovation.shape == (3,)
|
|
assert np.linalg.norm(innovation) > 0.5
|
|
|
|
|
|
class TestESKFSatelliteUpdate:
|
|
"""Tests for ESKF-03: Satellite measurement update."""
|
|
|
|
def test_satellite_update_corrects_position(self, eskf):
|
|
"""Test that satellite update moves position toward measurement."""
|
|
# Predict to drift
|
|
for i in range(10):
|
|
imu = IMUMeasurement(
|
|
accel=np.array([0.0, 0.0, 9.81]),
|
|
gyro=np.zeros(3),
|
|
timestamp=0.01 * (i + 1),
|
|
)
|
|
eskf.predict(imu)
|
|
|
|
pos_before = eskf.get_state().position.copy()
|
|
# Use target within Mahalanobis gate (small shift relative to P)
|
|
target_pos = np.array([5.0, 5.0, 0.0])
|
|
|
|
eskf.update_satellite(target_pos, noise_meters=10.0)
|
|
|
|
pos_after = eskf.get_state().position
|
|
# Position should move toward target
|
|
dist_before = np.linalg.norm(pos_before - target_pos)
|
|
dist_after = np.linalg.norm(pos_after - target_pos)
|
|
assert dist_after < dist_before
|
|
|
|
def test_satellite_update_tightens_covariance(self, eskf):
|
|
"""Test that satellite update reduces position covariance."""
|
|
trace_before = np.trace(eskf.get_state().covariance[0:3, 0:3])
|
|
|
|
eskf.update_satellite(np.zeros(3), noise_meters=5.0)
|
|
|
|
trace_after = np.trace(eskf.get_state().covariance[0:3, 0:3])
|
|
assert trace_after < trace_before
|
|
|
|
def test_satellite_update_covariance_bounded_by_noise(self, eskf):
|
|
"""Test that final covariance is bounded by satellite noise level."""
|
|
eskf.update_satellite(np.zeros(3), noise_meters=5.0)
|
|
|
|
pos_cov_trace = np.trace(eskf.get_state().covariance[0:3, 0:3])
|
|
# Covariance should be bounded by roughly 3*noise^2 (3 dimensions)
|
|
assert pos_cov_trace < 3 * (5.0 ** 2)
|
|
|
|
|
|
class TestESKFConfidenceTiers:
|
|
"""Tests for ESKF-05: Confidence tier computation."""
|
|
|
|
def test_confidence_high(self, eskf, config):
|
|
"""Test HIGH confidence with recent satellite and low covariance."""
|
|
eskf.update_satellite(np.zeros(3), noise_meters=5.0)
|
|
|
|
# Advance time slightly (still within satellite_max_age)
|
|
imu = IMUMeasurement(
|
|
accel=np.array([0.0, 0.0, 9.81]),
|
|
gyro=np.zeros(3),
|
|
timestamp=1.0,
|
|
)
|
|
eskf.predict(imu)
|
|
|
|
assert eskf.get_confidence() == ConfidenceTier.HIGH
|
|
|
|
def test_confidence_medium(self):
|
|
"""Test MEDIUM confidence with recent VO but no satellite."""
|
|
e = ESKF()
|
|
e.initialize(np.zeros(3), timestamp=0.0)
|
|
|
|
e.update_vo(np.zeros(3), dt_vo=0.1)
|
|
assert e.get_confidence() == ConfidenceTier.MEDIUM
|
|
|
|
def test_confidence_low(self, eskf):
|
|
"""Test LOW confidence with no recent measurements."""
|
|
# Many predictions without any update
|
|
for i in range(100):
|
|
imu = IMUMeasurement(
|
|
accel=np.zeros(3),
|
|
gyro=np.zeros(3),
|
|
timestamp=0.01 * (i + 1),
|
|
)
|
|
eskf.predict(imu)
|
|
|
|
assert eskf.get_confidence() == ConfidenceTier.LOW
|
|
|
|
def test_confidence_failed(self, eskf):
|
|
"""Test FAILED confidence with 3+ consecutive failures."""
|
|
assert eskf.get_confidence(consecutive_failures=3) == ConfidenceTier.FAILED
|
|
assert eskf.get_confidence(consecutive_failures=0) != ConfidenceTier.FAILED
|
|
|
|
|
|
class TestESKFStateAccess:
|
|
"""Tests for ESKF state accessor."""
|
|
|
|
def test_get_state_returns_eskf_state(self, eskf):
|
|
"""Test that get_state returns properly formed ESKFState."""
|
|
state = eskf.get_state()
|
|
assert state.position.shape == (3,)
|
|
assert state.velocity.shape == (3,)
|
|
assert state.quaternion.shape == (4,)
|
|
assert state.accel_bias.shape == (3,)
|
|
assert state.gyro_bias.shape == (3,)
|
|
assert state.covariance.shape == (15, 15)
|
|
assert isinstance(state.confidence, ConfidenceTier)
|
|
|
|
|
|
class TestESKFFusion:
|
|
"""Integration tests for full ESKF fusion sequence."""
|
|
|
|
def test_full_fusion_sequence(self):
|
|
"""Test complete IMU → VO → satellite fusion."""
|
|
e = ESKF()
|
|
e.initialize(np.zeros(3), timestamp=0.0)
|
|
|
|
# IMU prediction phase
|
|
for i in range(10):
|
|
imu = IMUMeasurement(
|
|
accel=np.array([0.0, 0.0, 9.81]),
|
|
gyro=np.zeros(3),
|
|
timestamp=0.01 * (i + 1),
|
|
)
|
|
e.predict(imu)
|
|
|
|
P_before_vo = np.trace(e.get_state().covariance[0:3, 0:3])
|
|
|
|
# VO update
|
|
e.update_vo(np.array([0.1, 0.0, 0.0]), dt_vo=0.1)
|
|
|
|
P_after_vo = np.trace(e.get_state().covariance[0:3, 0:3])
|
|
assert P_after_vo < P_before_vo
|
|
|
|
# More IMU prediction
|
|
for i in range(10):
|
|
imu = IMUMeasurement(
|
|
accel=np.array([0.0, 0.0, 9.81]),
|
|
gyro=np.zeros(3),
|
|
timestamp=0.01 * (i + 11),
|
|
)
|
|
e.predict(imu)
|
|
|
|
# Satellite update
|
|
e.update_satellite(np.array([1.0, 0.0, 0.0]), noise_meters=5.0)
|
|
|
|
state = e.get_state()
|
|
# Position should be close to satellite measurement
|
|
assert np.linalg.norm(state.position[:2] - [1.0, 0.0]) < 10.0
|
|
assert state.confidence == ConfidenceTier.HIGH
|