feat(eskf): implement 15-state ESKF core algorithm (ESKF-01..05)

IMU prediction (F/Q matrices, bias propagation), VO measurement
update, satellite measurement update, GPS initialization, and
confidence tier computation. NumPy only, ENU convention.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Yuzviak
2026-04-01 23:38:47 +03:00
parent 57c7a6b80a
commit 9d5337ad72
+359
View File
@@ -0,0 +1,359 @@
"""Error-State Kalman Filter (ESKF) — 15-state sensor fusion (Component F17)."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import numpy as np
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.eskf import (
ConfidenceTier,
ESKFConfig,
ESKFState,
IMUMeasurement,
)
if TYPE_CHECKING:
from gps_denied.core.coordinates import CoordinateTransformer
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Module-level helpers (private)
# ---------------------------------------------------------------------------
def _quat_to_rotation_matrix(q: np.ndarray) -> np.ndarray:
"""Convert [w, x, y, z] quaternion to 3x3 rotation matrix.
Uses: R = (2w^2 - 1)*I + 2*(v*v^T) + 2*w*skew(v)
where v = [x, y, z].
"""
w, x, y, z = q
v = np.array([x, y, z])
R = (2 * w * w - 1) * np.eye(3) + 2 * np.outer(v, v) + 2 * w * _skew_symmetric(v)
return R
def _quat_multiply(q1: np.ndarray, q2: np.ndarray) -> np.ndarray:
"""Hamilton product of two [w, x, y, z] quaternions."""
w1, x1, y1, z1 = q1
w2, x2, y2, z2 = q2
return np.array([
w1 * w2 - x1 * x2 - y1 * y2 - z1 * z2,
w1 * x2 + x1 * w2 + y1 * z2 - z1 * y2,
w1 * y2 - x1 * z2 + y1 * w2 + z1 * x2,
w1 * z2 + x1 * y2 - y1 * x2 + z1 * w2,
])
def _exp_quaternion(theta: np.ndarray) -> np.ndarray:
"""Convert rotation vector (3,) to quaternion [w, x, y, z].
Small-angle safe: returns identity quaternion for ||theta|| < 1e-10.
"""
angle = np.linalg.norm(theta)
if angle < 1e-10:
return np.array([1.0, 0.0, 0.0, 0.0])
axis = theta / angle
return np.array([np.cos(angle / 2), *(np.sin(angle / 2) * axis)])
def _skew_symmetric(v: np.ndarray) -> np.ndarray:
"""Return 3x3 skew-symmetric matrix from 3-vector."""
x, y, z = v
return np.array([
[0.0, -z, y],
[z, 0.0, -x],
[-y, x, 0.0],
])
# ---------------------------------------------------------------------------
# ESKF implementation
# ---------------------------------------------------------------------------
class ESKF:
"""15-state Error-State Kalman Filter.
State vector (error-state, 15-dim):
delta_x[0:3] — position error (ENU, meters)
delta_x[3:6] — velocity error (ENU, m/s)
delta_x[6:9] — attitude error (angle-axis, radians)
delta_x[9:12] — accelerometer bias error (m/s^2)
delta_x[12:15] — gyroscope bias error (rad/s)
Nominal state uses ENU convention to match existing CoordinateTransformer.
(solution.md uses NED; we use ENU for compatibility with the codebase.)
Gravity: g = [0, 0, -9.81] in ENU (Up is positive, gravity points down).
"""
def __init__(self, config: ESKFConfig | None = None) -> None:
self.config = config or ESKFConfig()
self._initialized: bool = False
self._nominal_state: dict | None = None
self._P: np.ndarray | None = None # (15, 15) covariance
self._last_satellite_time: float | None = None
self._last_vo_time: float | None = None
self._last_timestamp: float | None = None
# ------------------------------------------------------------------
# Initialization
# ------------------------------------------------------------------
def initialize(
self,
position_enu: np.ndarray,
timestamp: float,
velocity: np.ndarray | None = None,
quaternion: np.ndarray | None = None,
) -> None:
"""Initialize nominal state with given ENU position.
Sets a high-uncertainty diagonal covariance per ESKF-04.
"""
cfg = self.config
self._nominal_state = {
"position": np.array(position_enu, dtype=float),
"velocity": np.array(velocity, dtype=float) if velocity is not None else np.zeros(3),
"quaternion": np.array(quaternion, dtype=float) if quaternion is not None else np.array([1.0, 0.0, 0.0, 0.0]),
"accel_bias": np.zeros(3),
"gyro_bias": np.zeros(3),
}
diag = (
[cfg.init_pos_var] * 3
+ [cfg.init_vel_var] * 3
+ [cfg.init_att_var] * 3
+ [cfg.init_accel_bias_var] * 3
+ [cfg.init_gyro_bias_var] * 3
)
self._P = np.diag(diag).astype(float)
self._initialized = True
self._last_timestamp = timestamp
def initialize_from_gps(
self,
gps: GPSPoint,
altitude: float,
timestamp: float,
coord_transformer: CoordinateTransformer,
flight_id: str,
) -> None:
"""Initialize state from a GPS fix (GLOBAL_POSITION_INT).
Stores position in ENU (not NED) to be compatible with CoordinateTransformer.
Per ESKF-04: high-uncertainty covariance reflects poor initial estimate.
"""
east, north, _ = coord_transformer.gps_to_enu(flight_id, gps)
position_enu = np.array([east, north, altitude])
self.initialize(position_enu, timestamp)
# ------------------------------------------------------------------
# IMU prediction (ESKF-01)
# ------------------------------------------------------------------
def predict(self, imu: IMUMeasurement) -> None:
"""Propagate nominal state and covariance using an IMU measurement.
Implements discrete-time ESKF prediction:
- Corrects for estimated biases
- Propagates position, velocity, quaternion
- Builds F (state transition) and Q (process noise) matrices
- Covariance: P = F @ P @ F.T + Q
"""
if not self._initialized or self._nominal_state is None:
return
dt = imu.timestamp - self._last_timestamp
if dt <= 0 or dt > 1.0:
logger.debug("Skipping IMU prediction: dt=%.4f", dt)
return
cfg = self.config
ns = self._nominal_state
# Bias-corrected measurements
a_c = imu.accel - ns["accel_bias"]
w_c = imu.gyro - ns["gyro_bias"]
R = _quat_to_rotation_matrix(ns["quaternion"])
# Gravity in ENU: up is +Z, so gravity is -Z
g = np.array([0.0, 0.0, -9.81])
# Propagate nominal state
ns["position"] = ns["position"] + ns["velocity"] * dt
ns["velocity"] = ns["velocity"] + (R @ a_c + g) * dt
q_delta = _exp_quaternion(w_c * dt)
ns["quaternion"] = _quat_multiply(ns["quaternion"], q_delta)
ns["quaternion"] /= np.linalg.norm(ns["quaternion"])
# biases: unchanged (random walk captured in Q)
# --- Build 15x15 state transition matrix F ---
F = np.eye(15)
F[0:3, 3:6] = np.eye(3) * dt # p <- v
F[3:6, 6:9] = -R @ _skew_symmetric(a_c) * dt # v <- delta_theta
F[3:6, 9:12] = -R * dt # v <- b_a
F[6:9, 12:15] = -np.eye(3) * dt # theta <- b_g
# --- Build 15x15 process noise Q ---
Q = np.zeros((15, 15))
Q[3:6, 3:6] = (cfg.accel_noise_density ** 2) * dt * np.eye(3)
Q[6:9, 6:9] = (cfg.gyro_noise_density ** 2) * dt * np.eye(3)
Q[9:12, 9:12] = (cfg.accel_random_walk ** 2) * dt * np.eye(3)
Q[12:15, 12:15] = (cfg.gyro_random_walk ** 2) * dt * np.eye(3)
self._P = F @ self._P @ F.T + Q
self._last_timestamp = imu.timestamp
# ------------------------------------------------------------------
# VO measurement update (ESKF-02)
# ------------------------------------------------------------------
def update_vo(
self,
relative_position: np.ndarray,
dt_vo: float,
) -> np.ndarray:
"""Apply VO relative-position measurement update.
H_vo directly observes position error (displacement between frames).
Returns the innovation vector z (3,).
"""
if not self._initialized or self._nominal_state is None:
return np.zeros(3)
ns = self._nominal_state
# Predicted relative displacement from velocity integration
predicted_relative = ns["velocity"] * dt_vo
z = relative_position - predicted_relative # innovation (3,)
# H_vo: relative position ~ position error (simplified direct observation)
H_vo = np.zeros((3, 15))
H_vo[0:3, 0:3] = np.eye(3)
R_vo = (self.config.vo_position_noise ** 2) * np.eye(3)
S = H_vo @ self._P @ H_vo.T + R_vo
K = self._P @ H_vo.T @ np.linalg.inv(S)
delta_x = K @ z # (15,)
self._apply_correction(delta_x)
self._P = (np.eye(15) - K @ H_vo) @ self._P
self._last_vo_time = self._last_timestamp
return z
# ------------------------------------------------------------------
# Satellite measurement update (ESKF-03)
# ------------------------------------------------------------------
def update_satellite(
self,
position_enu: np.ndarray,
noise_meters: float,
) -> np.ndarray:
"""Apply absolute position update from satellite image matching.
H_sat directly observes position.
noise_meters is derived from RANSAC inlier ratio by the caller.
Returns the innovation vector z (3,).
"""
if not self._initialized or self._nominal_state is None:
return np.zeros(3)
ns = self._nominal_state
z = position_enu - ns["position"] # innovation (3,)
H_sat = np.zeros((3, 15))
H_sat[0:3, 0:3] = np.eye(3) # directly observes position
R_sat = (noise_meters ** 2) * np.eye(3)
S = H_sat @ self._P @ H_sat.T + R_sat
K = self._P @ H_sat.T @ np.linalg.inv(S)
delta_x = K @ z # (15,)
self._apply_correction(delta_x)
self._P = (np.eye(15) - K @ H_sat) @ self._P
self._last_satellite_time = self._last_timestamp
return z
# ------------------------------------------------------------------
# Confidence tier (ESKF-05)
# ------------------------------------------------------------------
def get_confidence(self, consecutive_failures: int = 0) -> ConfidenceTier:
"""Return confidence tier based on measurement recency and covariance.
Tiers:
FAILED — 3+ consecutive total failures
HIGH — satellite correction recent (<30s) + covariance small (<400 m^2)
MEDIUM — VO tracking recent (<5s) but no satellite
LOW — IMU-only (no VO, no satellite)
"""
if consecutive_failures >= 3:
return ConfidenceTier.FAILED
if self._last_timestamp is None or self._P is None:
return ConfidenceTier.LOW
cfg = self.config
current_time = self._last_timestamp
pos_cov_trace = float(np.trace(self._P[0:3, 0:3]))
if (
self._last_satellite_time is not None
and (current_time - self._last_satellite_time) < cfg.satellite_max_age
and pos_cov_trace < cfg.covariance_high_threshold
):
return ConfidenceTier.HIGH
if (
self._last_vo_time is not None
and (current_time - self._last_vo_time) < 5.0
):
return ConfidenceTier.MEDIUM
return ConfidenceTier.LOW
# ------------------------------------------------------------------
# State accessor
# ------------------------------------------------------------------
def get_state(self) -> ESKFState:
"""Return a snapshot of the current nominal state."""
ns = self._nominal_state or {}
return ESKFState(
position=ns.get("position", np.zeros(3)).copy(),
velocity=ns.get("velocity", np.zeros(3)).copy(),
quaternion=ns.get("quaternion", np.array([1.0, 0.0, 0.0, 0.0])).copy(),
accel_bias=ns.get("accel_bias", np.zeros(3)).copy(),
gyro_bias=ns.get("gyro_bias", np.zeros(3)).copy(),
covariance=self._P.copy() if self._P is not None else np.zeros((15, 15)),
timestamp=self._last_timestamp or 0.0,
confidence=self.get_confidence(),
last_satellite_time=self._last_satellite_time,
last_vo_time=self._last_vo_time,
)
@property
def initialized(self) -> bool:
"""True if ESKF has been initialized with a GPS position."""
return self._initialized
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _apply_correction(self, delta_x: np.ndarray) -> None:
"""Apply 15-dim error-state correction to nominal state."""
ns = self._nominal_state
ns["position"] += delta_x[0:3]
ns["velocity"] += delta_x[3:6]
q_correction = _exp_quaternion(delta_x[6:9])
ns["quaternion"] = _quat_multiply(ns["quaternion"], q_correction)
ns["quaternion"] /= np.linalg.norm(ns["quaternion"])
ns["accel_bias"] += delta_x[9:12]
ns["gyro_bias"] += delta_x[12:15]