From 9d5337ad72435278aacc2faf225f9fa5423c482e Mon Sep 17 00:00:00 2001 From: Yuzviak Date: Wed, 1 Apr 2026 23:38:47 +0300 Subject: [PATCH] feat(eskf): implement 15-state ESKF core algorithm (ESKF-01..05) IMU prediction (F/Q matrices, bias propagation), VO measurement update, satellite measurement update, GPS initialization, and confidence tier computation. NumPy only, ENU convention. Co-Authored-By: Claude Sonnet 4.6 --- src/gps_denied/core/eskf.py | 359 ++++++++++++++++++++++++++++++++++++ 1 file changed, 359 insertions(+) create mode 100644 src/gps_denied/core/eskf.py diff --git a/src/gps_denied/core/eskf.py b/src/gps_denied/core/eskf.py new file mode 100644 index 0000000..46e4308 --- /dev/null +++ b/src/gps_denied/core/eskf.py @@ -0,0 +1,359 @@ +"""Error-State Kalman Filter (ESKF) — 15-state sensor fusion (Component F17).""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +import numpy as np + +from gps_denied.schemas import GPSPoint +from gps_denied.schemas.eskf import ( + ConfidenceTier, + ESKFConfig, + ESKFState, + IMUMeasurement, +) + +if TYPE_CHECKING: + from gps_denied.core.coordinates import CoordinateTransformer + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Module-level helpers (private) +# --------------------------------------------------------------------------- + +def _quat_to_rotation_matrix(q: np.ndarray) -> np.ndarray: + """Convert [w, x, y, z] quaternion to 3x3 rotation matrix. + + Uses: R = (2w^2 - 1)*I + 2*(v*v^T) + 2*w*skew(v) + where v = [x, y, z]. + """ + w, x, y, z = q + v = np.array([x, y, z]) + R = (2 * w * w - 1) * np.eye(3) + 2 * np.outer(v, v) + 2 * w * _skew_symmetric(v) + return R + + +def _quat_multiply(q1: np.ndarray, q2: np.ndarray) -> np.ndarray: + """Hamilton product of two [w, x, y, z] quaternions.""" + w1, x1, y1, z1 = q1 + w2, x2, y2, z2 = q2 + return np.array([ + w1 * w2 - x1 * x2 - y1 * y2 - z1 * z2, + w1 * x2 + x1 * w2 + y1 * z2 - z1 * y2, + w1 * y2 - x1 * z2 + y1 * w2 + z1 * x2, + w1 * z2 + x1 * y2 - y1 * x2 + z1 * w2, + ]) + + +def _exp_quaternion(theta: np.ndarray) -> np.ndarray: + """Convert rotation vector (3,) to quaternion [w, x, y, z]. + + Small-angle safe: returns identity quaternion for ||theta|| < 1e-10. + """ + angle = np.linalg.norm(theta) + if angle < 1e-10: + return np.array([1.0, 0.0, 0.0, 0.0]) + axis = theta / angle + return np.array([np.cos(angle / 2), *(np.sin(angle / 2) * axis)]) + + +def _skew_symmetric(v: np.ndarray) -> np.ndarray: + """Return 3x3 skew-symmetric matrix from 3-vector.""" + x, y, z = v + return np.array([ + [0.0, -z, y], + [z, 0.0, -x], + [-y, x, 0.0], + ]) + + +# --------------------------------------------------------------------------- +# ESKF implementation +# --------------------------------------------------------------------------- + +class ESKF: + """15-state Error-State Kalman Filter. + + State vector (error-state, 15-dim): + delta_x[0:3] — position error (ENU, meters) + delta_x[3:6] — velocity error (ENU, m/s) + delta_x[6:9] — attitude error (angle-axis, radians) + delta_x[9:12] — accelerometer bias error (m/s^2) + delta_x[12:15] — gyroscope bias error (rad/s) + + Nominal state uses ENU convention to match existing CoordinateTransformer. + (solution.md uses NED; we use ENU for compatibility with the codebase.) + Gravity: g = [0, 0, -9.81] in ENU (Up is positive, gravity points down). + """ + + def __init__(self, config: ESKFConfig | None = None) -> None: + self.config = config or ESKFConfig() + self._initialized: bool = False + self._nominal_state: dict | None = None + self._P: np.ndarray | None = None # (15, 15) covariance + self._last_satellite_time: float | None = None + self._last_vo_time: float | None = None + self._last_timestamp: float | None = None + + # ------------------------------------------------------------------ + # Initialization + # ------------------------------------------------------------------ + + def initialize( + self, + position_enu: np.ndarray, + timestamp: float, + velocity: np.ndarray | None = None, + quaternion: np.ndarray | None = None, + ) -> None: + """Initialize nominal state with given ENU position. + + Sets a high-uncertainty diagonal covariance per ESKF-04. + """ + cfg = self.config + self._nominal_state = { + "position": np.array(position_enu, dtype=float), + "velocity": np.array(velocity, dtype=float) if velocity is not None else np.zeros(3), + "quaternion": np.array(quaternion, dtype=float) if quaternion is not None else np.array([1.0, 0.0, 0.0, 0.0]), + "accel_bias": np.zeros(3), + "gyro_bias": np.zeros(3), + } + diag = ( + [cfg.init_pos_var] * 3 + + [cfg.init_vel_var] * 3 + + [cfg.init_att_var] * 3 + + [cfg.init_accel_bias_var] * 3 + + [cfg.init_gyro_bias_var] * 3 + ) + self._P = np.diag(diag).astype(float) + self._initialized = True + self._last_timestamp = timestamp + + def initialize_from_gps( + self, + gps: GPSPoint, + altitude: float, + timestamp: float, + coord_transformer: CoordinateTransformer, + flight_id: str, + ) -> None: + """Initialize state from a GPS fix (GLOBAL_POSITION_INT). + + Stores position in ENU (not NED) to be compatible with CoordinateTransformer. + Per ESKF-04: high-uncertainty covariance reflects poor initial estimate. + """ + east, north, _ = coord_transformer.gps_to_enu(flight_id, gps) + position_enu = np.array([east, north, altitude]) + self.initialize(position_enu, timestamp) + + # ------------------------------------------------------------------ + # IMU prediction (ESKF-01) + # ------------------------------------------------------------------ + + def predict(self, imu: IMUMeasurement) -> None: + """Propagate nominal state and covariance using an IMU measurement. + + Implements discrete-time ESKF prediction: + - Corrects for estimated biases + - Propagates position, velocity, quaternion + - Builds F (state transition) and Q (process noise) matrices + - Covariance: P = F @ P @ F.T + Q + """ + if not self._initialized or self._nominal_state is None: + return + dt = imu.timestamp - self._last_timestamp + if dt <= 0 or dt > 1.0: + logger.debug("Skipping IMU prediction: dt=%.4f", dt) + return + + cfg = self.config + ns = self._nominal_state + + # Bias-corrected measurements + a_c = imu.accel - ns["accel_bias"] + w_c = imu.gyro - ns["gyro_bias"] + + R = _quat_to_rotation_matrix(ns["quaternion"]) + + # Gravity in ENU: up is +Z, so gravity is -Z + g = np.array([0.0, 0.0, -9.81]) + + # Propagate nominal state + ns["position"] = ns["position"] + ns["velocity"] * dt + ns["velocity"] = ns["velocity"] + (R @ a_c + g) * dt + q_delta = _exp_quaternion(w_c * dt) + ns["quaternion"] = _quat_multiply(ns["quaternion"], q_delta) + ns["quaternion"] /= np.linalg.norm(ns["quaternion"]) + # biases: unchanged (random walk captured in Q) + + # --- Build 15x15 state transition matrix F --- + F = np.eye(15) + F[0:3, 3:6] = np.eye(3) * dt # p <- v + F[3:6, 6:9] = -R @ _skew_symmetric(a_c) * dt # v <- delta_theta + F[3:6, 9:12] = -R * dt # v <- b_a + F[6:9, 12:15] = -np.eye(3) * dt # theta <- b_g + + # --- Build 15x15 process noise Q --- + Q = np.zeros((15, 15)) + Q[3:6, 3:6] = (cfg.accel_noise_density ** 2) * dt * np.eye(3) + Q[6:9, 6:9] = (cfg.gyro_noise_density ** 2) * dt * np.eye(3) + Q[9:12, 9:12] = (cfg.accel_random_walk ** 2) * dt * np.eye(3) + Q[12:15, 12:15] = (cfg.gyro_random_walk ** 2) * dt * np.eye(3) + + self._P = F @ self._P @ F.T + Q + self._last_timestamp = imu.timestamp + + # ------------------------------------------------------------------ + # VO measurement update (ESKF-02) + # ------------------------------------------------------------------ + + def update_vo( + self, + relative_position: np.ndarray, + dt_vo: float, + ) -> np.ndarray: + """Apply VO relative-position measurement update. + + H_vo directly observes position error (displacement between frames). + Returns the innovation vector z (3,). + """ + if not self._initialized or self._nominal_state is None: + return np.zeros(3) + + ns = self._nominal_state + + # Predicted relative displacement from velocity integration + predicted_relative = ns["velocity"] * dt_vo + z = relative_position - predicted_relative # innovation (3,) + + # H_vo: relative position ~ position error (simplified direct observation) + H_vo = np.zeros((3, 15)) + H_vo[0:3, 0:3] = np.eye(3) + + R_vo = (self.config.vo_position_noise ** 2) * np.eye(3) + S = H_vo @ self._P @ H_vo.T + R_vo + K = self._P @ H_vo.T @ np.linalg.inv(S) + + delta_x = K @ z # (15,) + self._apply_correction(delta_x) + + self._P = (np.eye(15) - K @ H_vo) @ self._P + self._last_vo_time = self._last_timestamp + return z + + # ------------------------------------------------------------------ + # Satellite measurement update (ESKF-03) + # ------------------------------------------------------------------ + + def update_satellite( + self, + position_enu: np.ndarray, + noise_meters: float, + ) -> np.ndarray: + """Apply absolute position update from satellite image matching. + + H_sat directly observes position. + noise_meters is derived from RANSAC inlier ratio by the caller. + Returns the innovation vector z (3,). + """ + if not self._initialized or self._nominal_state is None: + return np.zeros(3) + + ns = self._nominal_state + z = position_enu - ns["position"] # innovation (3,) + + H_sat = np.zeros((3, 15)) + H_sat[0:3, 0:3] = np.eye(3) # directly observes position + + R_sat = (noise_meters ** 2) * np.eye(3) + S = H_sat @ self._P @ H_sat.T + R_sat + K = self._P @ H_sat.T @ np.linalg.inv(S) + + delta_x = K @ z # (15,) + self._apply_correction(delta_x) + + self._P = (np.eye(15) - K @ H_sat) @ self._P + self._last_satellite_time = self._last_timestamp + return z + + # ------------------------------------------------------------------ + # Confidence tier (ESKF-05) + # ------------------------------------------------------------------ + + def get_confidence(self, consecutive_failures: int = 0) -> ConfidenceTier: + """Return confidence tier based on measurement recency and covariance. + + Tiers: + FAILED — 3+ consecutive total failures + HIGH — satellite correction recent (<30s) + covariance small (<400 m^2) + MEDIUM — VO tracking recent (<5s) but no satellite + LOW — IMU-only (no VO, no satellite) + """ + if consecutive_failures >= 3: + return ConfidenceTier.FAILED + + if self._last_timestamp is None or self._P is None: + return ConfidenceTier.LOW + + cfg = self.config + current_time = self._last_timestamp + pos_cov_trace = float(np.trace(self._P[0:3, 0:3])) + + if ( + self._last_satellite_time is not None + and (current_time - self._last_satellite_time) < cfg.satellite_max_age + and pos_cov_trace < cfg.covariance_high_threshold + ): + return ConfidenceTier.HIGH + + if ( + self._last_vo_time is not None + and (current_time - self._last_vo_time) < 5.0 + ): + return ConfidenceTier.MEDIUM + + return ConfidenceTier.LOW + + # ------------------------------------------------------------------ + # State accessor + # ------------------------------------------------------------------ + + def get_state(self) -> ESKFState: + """Return a snapshot of the current nominal state.""" + ns = self._nominal_state or {} + return ESKFState( + position=ns.get("position", np.zeros(3)).copy(), + velocity=ns.get("velocity", np.zeros(3)).copy(), + quaternion=ns.get("quaternion", np.array([1.0, 0.0, 0.0, 0.0])).copy(), + accel_bias=ns.get("accel_bias", np.zeros(3)).copy(), + gyro_bias=ns.get("gyro_bias", np.zeros(3)).copy(), + covariance=self._P.copy() if self._P is not None else np.zeros((15, 15)), + timestamp=self._last_timestamp or 0.0, + confidence=self.get_confidence(), + last_satellite_time=self._last_satellite_time, + last_vo_time=self._last_vo_time, + ) + + @property + def initialized(self) -> bool: + """True if ESKF has been initialized with a GPS position.""" + return self._initialized + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _apply_correction(self, delta_x: np.ndarray) -> None: + """Apply 15-dim error-state correction to nominal state.""" + ns = self._nominal_state + ns["position"] += delta_x[0:3] + ns["velocity"] += delta_x[3:6] + q_correction = _exp_quaternion(delta_x[6:9]) + ns["quaternion"] = _quat_multiply(ns["quaternion"], q_correction) + ns["quaternion"] /= np.linalg.norm(ns["quaternion"]) + ns["accel_bias"] += delta_x[9:12] + ns["gyro_bias"] += delta_x[12:15]