"""Tests for ESKF (F17) — Error-State Kalman Filter.""" import numpy as np import pytest from gps_denied.core.coordinates import CoordinateTransformer from gps_denied.core.eskf import ESKF from gps_denied.schemas import GPSPoint from gps_denied.schemas.eskf import ConfidenceTier, ESKFConfig, IMUMeasurement @pytest.fixture def eskf(): """ESKF initialized at origin with default config.""" e = ESKF() e.initialize(np.zeros(3), timestamp=0.0) return e @pytest.fixture def config(): return ESKFConfig() class TestESKFInitialization: """Tests for ESKF-04: Initialization.""" def test_initialization_default(self, config): """Test basic initialization with position and timestamp.""" e = ESKF() e.initialize(np.array([100.0, 200.0, 0.0]), timestamp=0.0) assert e.initialized state = e.get_state() assert np.allclose(state.position, [100.0, 200.0, 0.0]) assert np.allclose(state.velocity, [0.0, 0.0, 0.0]) assert np.allclose(state.quaternion, [1.0, 0.0, 0.0, 0.0]) assert state.covariance.shape == (15, 15) assert state.covariance[0, 0] == config.init_pos_var assert state.covariance[3, 3] == config.init_vel_var assert state.covariance[6, 6] == config.init_att_var def test_initialization_from_gps(self): """Test initialization from GPS position.""" ct = CoordinateTransformer() ct.set_enu_origin("f1", GPSPoint(lat=48.0, lon=37.0)) e = ESKF() e.initialize_from_gps( GPSPoint(lat=48.001, lon=37.001), altitude=600.0, timestamp=0.0, coord_transformer=ct, flight_id="f1", ) assert e.initialized state = e.get_state() assert not np.allclose(state.position, [0.0, 0.0, 0.0]) assert np.linalg.norm(state.position[:2]) > 50 # ~111m per 0.001 deg class TestESKFPrediction: """Tests for ESKF-01: IMU prediction.""" def test_predict_covariance_grows(self, eskf): """Test that covariance grows during IMU-only prediction.""" P_before = eskf.get_state().covariance.copy() trace_before = np.trace(P_before[0:3, 0:3]) imu = IMUMeasurement( accel=np.array([0.0, 0.0, 9.81]), gyro=np.zeros(3), timestamp=0.01, ) eskf.predict(imu) P_after = eskf.get_state().covariance trace_after = np.trace(P_after[0:3, 0:3]) assert trace_after > trace_before def test_predict_gravity_compensation(self, eskf): """Test that gravity is properly compensated (stationary accel has no velocity growth).""" # 100 steps at 0.01s = 1 second with gravity-aligned acceleration for i in range(100): imu = IMUMeasurement( accel=np.array([0.0, 0.0, 9.81]), gyro=np.zeros(3), timestamp=0.01 * (i + 1), ) eskf.predict(imu) state = eskf.get_state() # Velocity should remain near zero (gravity compensated) assert np.linalg.norm(state.velocity) < 1.0 def test_predict_with_acceleration(self, eskf): """Test velocity integration from applied acceleration.""" # 100 steps at 0.01s = 1 second with 1 m/s^2 forward + gravity for i in range(100): imu = IMUMeasurement( accel=np.array([1.0, 0.0, 9.81]), gyro=np.zeros(3), timestamp=0.01 * (i + 1), ) eskf.predict(imu) state = eskf.get_state() # Velocity[0] should be approximately 1.0 m/s assert abs(state.velocity[0] - 1.0) < 0.5 def test_predict_position_propagation(self): """Test position propagation from velocity.""" e = ESKF() e.initialize(np.zeros(3), timestamp=0.0, velocity=np.array([10.0, 0.0, 0.0])) for i in range(100): imu = IMUMeasurement( accel=np.array([0.0, 0.0, 9.81]), gyro=np.zeros(3), timestamp=0.01 * (i + 1), ) e.predict(imu) state = e.get_state() # Position[0] should be approximately 10.0 meters after 1 second at 10 m/s assert abs(state.position[0] - 10.0) < 2.0 class TestESKFVOUpdate: """Tests for ESKF-02: VO measurement update.""" def test_vo_update_reduces_uncertainty(self, eskf): """Test that VO update reduces position covariance.""" # Grow covariance first for _ in range(5): imu = IMUMeasurement( accel=np.zeros(3), gyro=np.zeros(3), timestamp=eskf._last_timestamp + 0.01, ) eskf.predict(imu) trace_before = np.trace(eskf.get_state().covariance[0:3, 0:3]) eskf.update_vo(np.zeros(3), dt_vo=0.1) trace_after = np.trace(eskf.get_state().covariance[0:3, 0:3]) assert trace_after < trace_before def test_vo_update_corrects_position(self, eskf): """Test that VO update shifts position toward measurement.""" # Predict to drift for i in range(10): imu = IMUMeasurement( accel=np.array([0.0, 0.0, 9.81]), gyro=np.zeros(3), timestamp=0.01 * (i + 1), ) eskf.predict(imu) pos_before = eskf.get_state().position.copy() # VO measurement with 1m north offset eskf.update_vo(np.array([0.0, 1.0, 0.0]), dt_vo=0.1) pos_after = eskf.get_state().position assert not np.allclose(pos_after, pos_before) def test_vo_update_returns_innovation(self, eskf): """Test that VO update returns innovation vector.""" innovation = eskf.update_vo(np.array([1.0, 0.0, 0.0]), dt_vo=0.1) assert innovation.shape == (3,) assert np.linalg.norm(innovation) > 0.5 class TestESKFSatelliteUpdate: """Tests for ESKF-03: Satellite measurement update.""" def test_satellite_update_corrects_position(self, eskf): """Test that satellite update moves position toward measurement.""" # Predict to drift for i in range(10): imu = IMUMeasurement( accel=np.array([0.0, 0.0, 9.81]), gyro=np.zeros(3), timestamp=0.01 * (i + 1), ) eskf.predict(imu) pos_before = eskf.get_state().position.copy() # Use target within Mahalanobis gate (small shift relative to P) target_pos = np.array([5.0, 5.0, 0.0]) eskf.update_satellite(target_pos, noise_meters=10.0) pos_after = eskf.get_state().position # Position should move toward target dist_before = np.linalg.norm(pos_before - target_pos) dist_after = np.linalg.norm(pos_after - target_pos) assert dist_after < dist_before def test_satellite_update_tightens_covariance(self, eskf): """Test that satellite update reduces position covariance.""" trace_before = np.trace(eskf.get_state().covariance[0:3, 0:3]) eskf.update_satellite(np.zeros(3), noise_meters=5.0) trace_after = np.trace(eskf.get_state().covariance[0:3, 0:3]) assert trace_after < trace_before def test_satellite_update_covariance_bounded_by_noise(self, eskf): """Test that final covariance is bounded by satellite noise level.""" eskf.update_satellite(np.zeros(3), noise_meters=5.0) pos_cov_trace = np.trace(eskf.get_state().covariance[0:3, 0:3]) # Covariance should be bounded by roughly 3*noise^2 (3 dimensions) assert pos_cov_trace < 3 * (5.0 ** 2) class TestESKFConfidenceTiers: """Tests for ESKF-05: Confidence tier computation.""" def test_confidence_high(self, eskf, config): """Test HIGH confidence with recent satellite and low covariance.""" eskf.update_satellite(np.zeros(3), noise_meters=5.0) # Advance time slightly (still within satellite_max_age) imu = IMUMeasurement( accel=np.array([0.0, 0.0, 9.81]), gyro=np.zeros(3), timestamp=1.0, ) eskf.predict(imu) assert eskf.get_confidence() == ConfidenceTier.HIGH def test_confidence_medium(self): """Test MEDIUM confidence with recent VO but no satellite.""" e = ESKF() e.initialize(np.zeros(3), timestamp=0.0) e.update_vo(np.zeros(3), dt_vo=0.1) assert e.get_confidence() == ConfidenceTier.MEDIUM def test_confidence_low(self, eskf): """Test LOW confidence with no recent measurements.""" # Many predictions without any update for i in range(100): imu = IMUMeasurement( accel=np.zeros(3), gyro=np.zeros(3), timestamp=0.01 * (i + 1), ) eskf.predict(imu) assert eskf.get_confidence() == ConfidenceTier.LOW def test_confidence_failed(self, eskf): """Test FAILED confidence with 3+ consecutive failures.""" assert eskf.get_confidence(consecutive_failures=3) == ConfidenceTier.FAILED assert eskf.get_confidence(consecutive_failures=0) != ConfidenceTier.FAILED class TestESKFStateAccess: """Tests for ESKF state accessor.""" def test_get_state_returns_eskf_state(self, eskf): """Test that get_state returns properly formed ESKFState.""" state = eskf.get_state() assert state.position.shape == (3,) assert state.velocity.shape == (3,) assert state.quaternion.shape == (4,) assert state.accel_bias.shape == (3,) assert state.gyro_bias.shape == (3,) assert state.covariance.shape == (15, 15) assert isinstance(state.confidence, ConfidenceTier) class TestESKFFusion: """Integration tests for full ESKF fusion sequence.""" def test_full_fusion_sequence(self): """Test complete IMU → VO → satellite fusion.""" e = ESKF() e.initialize(np.zeros(3), timestamp=0.0) # IMU prediction phase for i in range(10): imu = IMUMeasurement( accel=np.array([0.0, 0.0, 9.81]), gyro=np.zeros(3), timestamp=0.01 * (i + 1), ) e.predict(imu) P_before_vo = np.trace(e.get_state().covariance[0:3, 0:3]) # VO update e.update_vo(np.array([0.1, 0.0, 0.0]), dt_vo=0.1) P_after_vo = np.trace(e.get_state().covariance[0:3, 0:3]) assert P_after_vo < P_before_vo # More IMU prediction for i in range(10): imu = IMUMeasurement( accel=np.array([0.0, 0.0, 9.81]), gyro=np.zeros(3), timestamp=0.01 * (i + 11), ) e.predict(imu) # Satellite update e.update_satellite(np.array([1.0, 0.0, 0.0]), noise_meters=5.0) state = e.get_state() # Position should be close to satellite measurement assert np.linalg.norm(state.position[:2] - [1.0, 0.0]) < 10.0 assert state.confidence == ConfidenceTier.HIGH