diff --git a/.gitignore b/.gitignore index db5f21c..6d5583a 100644 --- a/.gitignore +++ b/.gitignore @@ -65,6 +65,9 @@ fdr_output/ tile_cache/ e2e-results/ +# Local scratch / one-off diagnostics +_scratch/ + # Secrets .env .env.local diff --git a/_docs/02_document/contracts/c1_vio/vio_strategy_protocol.md b/_docs/02_document/contracts/c1_vio/vio_strategy_protocol.md index a32f30a..a025e60 100644 --- a/_docs/02_document/contracts/c1_vio/vio_strategy_protocol.md +++ b/_docs/02_document/contracts/c1_vio/vio_strategy_protocol.md @@ -127,6 +127,12 @@ Lives at `src/gps_denied_onboard/runtime_root/vio_factory.py`. Selects the strat - **6×6 SPD covariance always returned**: `pose_covariance_6x6` is symmetric and positive-definite for every `VioOutput`. Implementations MUST NOT return a "tightened" covariance (smaller Frobenius norm) during a degradation event; honest covariance is the safety floor for AC-NEW-4 and AC-NEW-7. A test (covariance-monotonicity contract test, deferred to Step 9 / E-BBT) asserts this across all three strategies. - **`frame_id` echo**: `VioOutput.frame_id` equals the input `NavCameraFrame.frame_id`. C5 relies on this for time-aligned factor insertion. +- **`relative_pose_T.translation()` is in metres** (NOT pixels, NOT unit-length). Every strategy MUST emit metric translation; C5 fuses it directly into the state estimator without further scaling. Monocular strategies (KLT/RANSAC) recover scale through an injected `AltitudeProvider` (see AZ-919/AZ-920); stereo / VIO strategies (OKVIS2, VINS-Mono) get scale from their backend optimization. +- **`scale_quality` carries the per-frame degraded-mode signal** (AZ-921). Three values: + - `"metric"` — translation is in metres, fully trustworthy. ESKF consumes `pose_covariance_6x6` as-is. + - `"direction_only"` — translation direction is informative but magnitude is not (near-vertical motion in a nadir camera; banked turn). ESKF overrides `R_meas[0:3, 0:3]` to `_DIRECTION_ONLY_TRANSLATION_SIGMA_M² = 64 m²` so the rotation update is honoured and the position update contributes little. + - `"unknown"` — translation is not trustworthy at all (AGL missing, zero inlier flow, hover, stationary). ESKF overrides `R_meas[0:3, 0:3]` to `_UNKNOWN_TRANSLATION_SIGMA_M² = 1e6 m²` so the position update is effectively skipped while the rotation update remains active. + Default `"unknown"` on the `VioOutput` dataclass keeps legacy strategies bug-for-bug compatible until they opt in to the AZ-919 `AltitudeProvider` plumbing. - **Single-threaded by contract**: each `VioStrategy` instance is bound to one writer thread (the camera ingest thread). Concurrent calls to `process_frame` on the same instance are undefined behaviour. The composition root binds one instance per ingest thread. - **`reset_to_warm_start` is destructive**: clears the strategy's keyframe window, IMU integration state, and feature track buffer; subsequent `process_frame` calls re-initialise from the hint. Calling `reset_to_warm_start` mid-flight is allowed (F8 reboot recovery) but must not be issued concurrently with a `process_frame` call on the same instance. - **`current_strategy_label()` is constant per instance**: returns the same string for the lifetime of the instance and matches `config.vio.strategy` exactly. The label is FDR-stamped on every `VioHealth` event for AC-NEW-3 audit. diff --git a/_docs/02_document/contracts/replay/csv_replay_format.md b/_docs/02_document/contracts/replay/csv_replay_format.md index bde20e5..e7e51c7 100644 --- a/_docs/02_document/contracts/replay/csv_replay_format.md +++ b/_docs/02_document/contracts/replay/csv_replay_format.md @@ -47,16 +47,24 @@ but the column **names** must match exactly (case-sensitive). ### Required columns -| # | Column | Unit | Type | Notes | -|---|--------|------|------|-------| +CSV columns use the MAVLink wire format (mG accel, mrad/s gyro, FRD +body frame). The parser converts to SI / FLU at the `ImuSample` +boundary via +`gps_denied_onboard.helpers.imu_units.mavlink_imu_to_si_flu` (AZ-918) +so downstream C5 ESKF + `imu_preintegrator` consumers see the contract +they were built for. **Operator-facing CSV files keep the raw scaling** +— the conversion is a parser-internal concern. + +| # | Column | Unit (CSV) | Type | Notes | +|---|--------|------------|------|-------| | 1 | `timestamp(ms)` | ms | float | Pixhawk wall clock at sample capture. **Ignored by the replay pipeline** — kept only for trace-back to the original tlog. | | 2 | `Time` | s | float | **Canonical replay clock.** Must start at `0.0`, increase monotonically, and be uniformly spaced. The replay loop uses this column for every timestamp it emits. | -| 3 | `SCALED_IMU2.xacc` | mg | float | Body-frame X accelerometer, MAVLink `SCALED_IMU2` raw scaling. Forwarded unchanged into `ImuSample.accel_xyz[0]`. | -| 4 | `SCALED_IMU2.yacc` | mg | float | Body-frame Y accelerometer. | -| 5 | `SCALED_IMU2.zacc` | mg | float | Body-frame Z accelerometer. | -| 6 | `SCALED_IMU2.xgyro` | mrad/s | float | Body-frame X gyro, MAVLink `SCALED_IMU2` raw scaling. Forwarded unchanged into `ImuSample.gyro_xyz[0]`. | -| 7 | `SCALED_IMU2.ygyro` | mrad/s | float | Body-frame Y gyro. | -| 8 | `SCALED_IMU2.zgyro` | mrad/s | float | Body-frame Z gyro. | +| 3 | `SCALED_IMU2.xacc` | mg, FRD | float | Body-frame X accelerometer, MAVLink `SCALED_IMU2` raw scaling. Converted by the parser to m/s² in `ImuSample.accel_xyz[0]` (FLU body). | +| 4 | `SCALED_IMU2.yacc` | mg, FRD | float | Body-frame Y accelerometer; sign-flipped during FRD→FLU. | +| 5 | `SCALED_IMU2.zacc` | mg, FRD | float | Body-frame Z accelerometer; sign-flipped during FRD→FLU. | +| 6 | `SCALED_IMU2.xgyro` | mrad/s, FRD | float | Body-frame X gyro, MAVLink `SCALED_IMU2` raw scaling. Converted to rad/s in `ImuSample.gyro_xyz[0]` (FLU body). | +| 7 | `SCALED_IMU2.ygyro` | mrad/s, FRD | float | Body-frame Y gyro; sign-flipped during FRD→FLU. | +| 8 | `SCALED_IMU2.zgyro` | mrad/s, FRD | float | Body-frame Z gyro; sign-flipped during FRD→FLU. | | 9 | `GLOBAL_POSITION_INT.lat` | degrees | float | WGS84 latitude. **Already in decimal degrees** (Derkachi dump convention — pre-divided by 1e7 from MAVLink's int representation). | | 10 | `GLOBAL_POSITION_INT.lon` | degrees | float | WGS84 longitude (same convention as `lat`). | | 11 | `GLOBAL_POSITION_INT.alt` | mm | float | MSL altitude. Parser divides by 1000 to emit metres. | diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 98ac306..30ea4f7 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -6,9 +6,9 @@ step: 10 name: Implement status: in_progress sub_step: - phase: 8 - name: cumulative-review - detail: "batches 01-03 of cycle 4 — first cumulative review pending" + phase: 9 + name: out-of-band-bugfix + detail: "F4 GSD scale recovery in TIER-2 VERIFICATION. AZ-919/920/921 all In Testing in Jira; Tier-1 (macOS) 2331 unit tests green. Jetson harness running 2026-05-27T14:06Z (rsync+image build+e2e). Awaiting pytest summary for the 4 previously-failing Derkachi tests (test_ac1_exits_0_jsonl_count_match, test_ac5_determinism_two_runs_diff, test_ac6_pace_realtime_60s_within_5pct, test_ac6_pace_asap_under_30s). AZ-918 sibling IMU fix is implicitly verified by the same Jetson run." retry_count: 0 cycle: 4 tracker: jira diff --git a/_docs/_process_leftovers/2026-05-11_d_cross_cve_1_opencv_pin_deferred.md b/_docs/_process_leftovers/2026-05-11_d_cross_cve_1_opencv_pin_deferred.md index c425d5d..451d91c 100644 --- a/_docs/_process_leftovers/2026-05-11_d_cross_cve_1_opencv_pin_deferred.md +++ b/_docs/_process_leftovers/2026-05-11_d_cross_cve_1_opencv_pin_deferred.md @@ -1,8 +1,8 @@ # D-CROSS-CVE-1 opencv-python pin deferred — gtsam/numpy ABI block **Recorded**: 2026-05-11T02:55+03:00 (Europe/Kyiv) -**Last replay attempt**: 2026-05-26T18:44+03:00 (Europe/Kyiv) — replay re-checked -at start of next `/autodev` invocation. PyPI re-queried via +**Last replay attempt**: 2026-05-27T11:14+03:00 (Europe/Kyiv) — replay re-checked +before AZ-919/920/921 tracker write. PyPI re-queried via `python3 -m pip index versions gtsam`: only `gtsam 4.2` is published. Replay condition (numpy>=2 stable wheels) still NOT met. Leftover remains open. **Status**: deferred-non-user (replay when upstream gtsam wheels target numpy>=2) diff --git a/src/gps_denied_onboard/_types/fc.py b/src/gps_denied_onboard/_types/fc.py index f959394..84d7576 100644 --- a/src/gps_denied_onboard/_types/fc.py +++ b/src/gps_denied_onboard/_types/fc.py @@ -113,6 +113,17 @@ class ImuTelemetrySample: type) because C8 also carries `received_at` decode-side monotonic_ns from Invariant 7 (out-of-order drop). The C8 inbound path (AZ-391) wraps this in `FcTelemetryFrame`. + + Units and body frame match the ``nav.ImuSample`` contract: accel + in **m/s²**, gyro in **rad/s**, body frame **FLU / Z-up**. The + MAVLink-protocol adapters + (``c8_fc_adapter.tlog_replay_adapter._handle_imu``, + ``c8_fc_adapter._inbound_mavlink._handle_imu``) route raw + SCALED_IMU/RAW_IMU fields through + :func:`gps_denied_onboard.helpers.imu_units.mavlink_imu_to_si_flu` + before construction (AZ-918). The MSP2/iNav decoder + (``_inbound_msp2``) speaks a different wire protocol and owns + its own conversion contract. """ ts_ns: int diff --git a/src/gps_denied_onboard/_types/nav.py b/src/gps_denied_onboard/_types/nav.py index b7e9b16..cfd6615 100644 --- a/src/gps_denied_onboard/_types/nav.py +++ b/src/gps_denied_onboard/_types/nav.py @@ -14,7 +14,7 @@ from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal if TYPE_CHECKING: import numpy.typing as npt @@ -22,6 +22,13 @@ if TYPE_CHECKING: from gps_denied_onboard.helpers.se3_utils import SE3 +# AZ-921 — per-frame VIO scale-quality signal that the ESKF honours by +# adapting the translation block of R_meas. See +# `_docs/02_document/contracts/c1_vio/vio_strategy_protocol.md` and the +# ESKF add_vio implementation for the consumer side. +ScaleQuality = Literal["metric", "direction_only", "unknown"] + + @dataclass(frozen=True) class NavCameraFrame: """A single nav-camera frame routed into the pipeline.""" @@ -35,12 +42,25 @@ class NavCameraFrame: @dataclass(frozen=True) class ImuSample: - """A single IMU sample. + """A single IMU sample, normalised to the C1/C5 contract. - Timestamp is monotonic nanoseconds (per FC clock) per the + ``ts_ns`` is monotonic nanoseconds (per FC clock) per the `imu_preintegrator` contract — the preintegrator enforces strict monotonicity on this field, so it MUST be the producer's monotonic source-of-truth, not a wall-clock conversion. + + ``accel_xyz`` and ``gyro_xyz`` are in **SI units** in the **FLU + body frame** (X-forward, Y-left, Z-up; body Z parallel to ENU + world Z at identity rotation): + + * ``accel_xyz`` — specific force in **m/s²**. + * ``gyro_xyz`` — angular rate in **rad/s**. + + MAVLink wire-format fields (``RAW_IMU``, ``SCALED_IMU2/3``) ship + as mG / mrad·s⁻¹ in the FRD body frame, so every adapter that + constructs an ``ImuSample`` from raw MAVLink MUST route through + :func:`gps_denied_onboard.helpers.imu_units.mavlink_imu_to_si_flu` + (AZ-918). """ ts_ns: int @@ -164,6 +184,7 @@ class VioOutput: imu_bias: "ImuBias" feature_quality: FeatureQuality emitted_at_ns: int + scale_quality: ScaleQuality = "unknown" @dataclass(frozen=True) diff --git a/src/gps_denied_onboard/components/c1_vio/_gsd_scale.py b/src/gps_denied_onboard/components/c1_vio/_gsd_scale.py new file mode 100644 index 0000000..acd77da --- /dev/null +++ b/src/gps_denied_onboard/components/c1_vio/_gsd_scale.py @@ -0,0 +1,175 @@ +"""Monocular VIO metric scale recovery via Ground Sample Distance (GSD). + +`cv2.recoverPose` returns a unit-length translation direction; the +absolute metric scale is unrecoverable from a single camera with no +known-size reference. AZ-920 closes that gap for the Derkachi flight +geometry — nadir-pointing camera, fixed-wing aircraft in cruise, +piecewise-planar agricultural terrain — by using the textbook +ground-plane / camera-height scale-recovery method. + +The key observation: for a nadir-pointing pinhole camera at AGL ``h`` +metres with focal length ``f_x`` in pixels, the GSD (ground sample +distance, m/px) is ``h / f_x``. The mean inlier optical-flow magnitude +between consecutive frames, in pixels, therefore corresponds to a +metric ground-plane displacement of ``mean_flow_px * h / f_x``. That +displacement is the magnitude of the camera-XY-plane component of the +translation; dividing by ``||t_unit_xy||`` yields the scale factor to +apply to the unit translation vector. + +References (web-search-verified 2026-05-27): + +- Tian et al. 2021, "A light-weight scale recovery framework based on + plane geometry", `arXiv:2101.05995 `_. +- Song & Chandraker, "Robust Scale Estimation in Real-Time Monocular + SFM for Autonomous Driving", CVPR 2014. +- Wikipedia, `Ground sample distance + `_. + +This module is intentionally a pure-Python helper: no side effects, no +logging, no clock access. The caller (``KltRansacStrategy.process_frame``) +guards the integration boundary and decides what to do when the +returned uncertainty is ``inf`` — that decision is the AZ-921 surface. +""" + +from __future__ import annotations + +import math +from typing import Final + +import numpy as np +import numpy.typing as npt + +from gps_denied_onboard._types.nav import ScaleQuality + +__all__ = [ + "INFINITE_SCALE_UNCERTAINTY_M", + "RELATIVE_DISPLACEMENT_NOISE", + "ScaleRecoveryResult", + "recover_scale", +] + + +# 10% relative-displacement noise budget covering GSD focal-length +# error (~3% per camera_info.md), AGL bias from the ESKF integrator, +# and inlier-flow-mean residual scatter. Pin until AZ-920 lands a real +# Jetson run and we can measure the residual. +RELATIVE_DISPLACEMENT_NOISE: Final[float] = 0.10 + +# Sentinel returned when scale is not recoverable in the current frame +# (AGL unknown, near-pure-vertical motion, zero inlier flow). The +# caller treats this as "do not trust translation magnitude". +INFINITE_SCALE_UNCERTAINTY_M: Final[float] = math.inf + +# AZ-921 — below this horizontal-component magnitude the camera is +# moving essentially along its optical axis (climb/descent for a nadir +# cam), which is degenerate for the GSD MAGNITUDE recovery — but the +# DIRECTION of t_unit is still meaningful, so this case routes to +# scale_quality="direction_only" rather than "unknown". +_MIN_T_HORIZONTAL_MAGNITUDE: Final[float] = 0.05 + +# AZ-921 — below this mean inlier flow magnitude the strategy cannot +# distinguish "stationary aircraft" from "scene-locked features that +# happen not to move" (hover, stale features). Routes to +# scale_quality="unknown" because the geometry is too degenerate to +# trust either the magnitude or the direction of t_unit. +_MIN_MEAN_FLOW_PX: Final[float] = 0.5 + + +class ScaleRecoveryResult: + """Lightweight return tuple — keeps three named values together. + + Not a frozen dataclass to avoid the import-cost overhead on the + KLT/RANSAC hot path (called every frame). + """ + + __slots__ = ("quality", "scale_factor", "scale_uncertainty_m") + + def __init__( + self, + scale_factor: float, + scale_uncertainty_m: float, + quality: ScaleQuality, + ) -> None: + self.scale_factor = scale_factor + self.scale_uncertainty_m = scale_uncertainty_m + self.quality = quality + + def is_recoverable(self) -> bool: + return math.isfinite(self.scale_uncertainty_m) + + def __repr__(self) -> str: + return ( + f"ScaleRecoveryResult(scale_factor={self.scale_factor!r}, " + f"scale_uncertainty_m={self.scale_uncertainty_m!r}, " + f"quality={self.quality!r})" + ) + + +def recover_scale( + *, + t_unit: npt.NDArray[np.float64], + pts_prev: npt.NDArray[np.float64], + pts_curr: npt.NDArray[np.float64], + intrinsics_3x3: npt.NDArray[np.float64], + agl_m: float | None, +) -> ScaleRecoveryResult: + """Compute the metric scale factor for a monocular unit translation. + + Args: + t_unit: Unit-length 3-vector from ``cv2.recoverPose`` (camera + coords; OpenCV convention is X-right, Y-down, Z-forward). + pts_prev: Nx2 inlier feature pixel coordinates in the previous + frame. + pts_curr: Nx2 inlier feature pixel coordinates in the current + frame. + intrinsics_3x3: Camera intrinsic matrix; the function uses + ``f_x = K[0, 0]`` as the nadir-pinhole focal length in + pixels (``f_y`` is assumed equal — the Derkachi factory + calibration matches this). + agl_m: Above-ground-level height in metres, or ``None`` if the + AltitudeProvider could not resolve it. + + Returns: + :class:`ScaleRecoveryResult`. ``scale_uncertainty_m`` is + :data:`INFINITE_SCALE_UNCERTAINTY_M` when scale cannot be + recovered (AGL unknown, zero focal length, degenerate flow, + near-pure-vertical motion). In every degenerate case the + ``scale_factor`` is ``0.0`` so multiplying a unit translation + yields a zero translation — safe default that lets the ESKF's + own bias prior absorb the missing constraint. + + Notes: + Geometry assumption: nadir-pointing pinhole camera. The + derivation is in the module docstring. When the assumption + breaks (banked turn, gimbal not nadir, mountainous terrain), + AZ-921 will gate this output via a ``scale_quality`` signal. + """ + if agl_m is None or agl_m <= 0.0: + return ScaleRecoveryResult(0.0, INFINITE_SCALE_UNCERTAINTY_M, "unknown") + + fx = float(intrinsics_3x3[0, 0]) + if fx <= 0.0 or not math.isfinite(fx): + return ScaleRecoveryResult(0.0, INFINITE_SCALE_UNCERTAINTY_M, "unknown") + + if pts_prev.shape[0] == 0 or pts_curr.shape[0] == 0: + return ScaleRecoveryResult(0.0, INFINITE_SCALE_UNCERTAINTY_M, "unknown") + if pts_prev.shape != pts_curr.shape: + return ScaleRecoveryResult(0.0, INFINITE_SCALE_UNCERTAINTY_M, "unknown") + + flow = pts_curr - pts_prev + flow_magnitudes_px = np.linalg.norm(flow, axis=1) + mean_flow_px = float(np.mean(flow_magnitudes_px)) + if not math.isfinite(mean_flow_px) or mean_flow_px < _MIN_MEAN_FLOW_PX: + return ScaleRecoveryResult(0.0, INFINITE_SCALE_UNCERTAINTY_M, "unknown") + + t_horizontal = float(np.hypot(t_unit[0], t_unit[1])) + if not math.isfinite(t_horizontal) or t_horizontal < _MIN_T_HORIZONTAL_MAGNITUDE: + return ScaleRecoveryResult(0.0, INFINITE_SCALE_UNCERTAINTY_M, "direction_only") + + # GSD m/px = AGL / f_x for nadir pinhole. + gsd_m_per_px = agl_m / fx + displacement_m = mean_flow_px * gsd_m_per_px + scale_factor = displacement_m / t_horizontal + scale_uncertainty_m = displacement_m * RELATIVE_DISPLACEMENT_NOISE + + return ScaleRecoveryResult(scale_factor, scale_uncertainty_m, "metric") diff --git a/src/gps_denied_onboard/components/c1_vio/config.py b/src/gps_denied_onboard/components/c1_vio/config.py index 8e49712..4c4bc11 100644 --- a/src/gps_denied_onboard/components/c1_vio/config.py +++ b/src/gps_denied_onboard/components/c1_vio/config.py @@ -28,6 +28,7 @@ min features for pose, per-frame debug log). Only consulted when from __future__ import annotations +import math from dataclasses import dataclass, field from typing import Final @@ -214,6 +215,15 @@ class KltRansacConfig: internal RANSAC; default 1.0 px in normalised image coordinates (OpenCV's documented default for forward-looking cameras). + ``max_frame_rotation_rad`` is the AZ-922 cheirality gate + threshold — pose-recovery results whose rotation angle exceeds + this value are rejected as the essential-matrix twisted-pair + ambiguity (``cv2.recoverPose`` can return a 180° rotation when + parallax is low). Default 0.524 rad (30°) — a generous upper bound + for any physically plausible single-frame body rotation in + RESTRICT-UAV-3 within the 33 ms / 30 fps frame budget. Must be + in (0, π]. + ``per_frame_debug_log`` enables a DEBUG log line per ``process_frame`` — OFF by default (would flood at 3 Hz steady-state). """ @@ -224,6 +234,7 @@ class KltRansacConfig: min_features_for_pose: int = 30 ransac_inlier_ratio: float = 0.5 essential_matrix_ransac_threshold_px: float = 1.0 + max_frame_rotation_rad: float = math.radians(30.0) per_frame_debug_log: bool = False def __post_init__(self) -> None: @@ -257,6 +268,11 @@ class KltRansacConfig: "KltRansacConfig.essential_matrix_ransac_threshold_px must be " f"> 0; got {self.essential_matrix_ransac_threshold_px}" ) + if not (0.0 < self.max_frame_rotation_rad <= math.pi): + raise ConfigError( + "KltRansacConfig.max_frame_rotation_rad must be in (0, pi]; " + f"got {self.max_frame_rotation_rad}" + ) @dataclass(frozen=True) diff --git a/src/gps_denied_onboard/components/c1_vio/klt_ransac.py b/src/gps_denied_onboard/components/c1_vio/klt_ransac.py index 8337a4c..fe09f56 100644 --- a/src/gps_denied_onboard/components/c1_vio/klt_ransac.py +++ b/src/gps_denied_onboard/components/c1_vio/klt_ransac.py @@ -74,6 +74,7 @@ Risk mitigations (see task spec for full text): from __future__ import annotations +import math from typing import TYPE_CHECKING, Any, Final, Literal import cv2 @@ -82,6 +83,7 @@ import numpy as np from gps_denied_onboard._types.nav import ( FeatureQuality, ImuBias, + ScaleQuality, VioHealth, VioOutput, VioState, @@ -92,6 +94,10 @@ from gps_denied_onboard.components.c1_vio._facade_spine import ( bias_norm, se3_from_4x4, ) +from gps_denied_onboard.components.c1_vio._gsd_scale import ( + ScaleRecoveryResult, + recover_scale, +) from gps_denied_onboard.components.c1_vio.errors import ( VioFatalError, VioInitializingError, @@ -117,6 +123,7 @@ if TYPE_CHECKING: from gps_denied_onboard.components.c1_vio.config import KltRansacConfig from gps_denied_onboard.config import Config from gps_denied_onboard.fdr_client.client import FdrClient + from gps_denied_onboard.helpers.altitude_provider import AltitudeProvider __all__ = ["KltRansacStrategy"] @@ -134,6 +141,13 @@ _ESSENTIAL_MATRIX_DOF: Final[int] = 5 # (relative pose is identity anyway). Documented limit, not derived. _INIT_STATE_COVARIANCE_SCALAR: Final[float] = 10.0 +# AZ-920: floor on the per-axis position variance for scale-recovered +# translations. Even with perfect inlier flow the GSD has a +# ~3 % focal-length error (see flight_derkachi/camera_info.md) and +# the AGL has ESKF integrator bias, so a sub-millimetre cov is +# implausible. 1 cm sigma per axis is the smallest defensible value. +_MIN_POSITION_VARIANCE_M2: Final[float] = 1.0e-4 + def _grayscale(image: npt.NDArray[Any]) -> npt.NDArray[Any]: """Coerce a NavCameraFrame image to OpenCV's expected 2D ``uint8``. @@ -168,6 +182,21 @@ def _grayscale(image: npt.NDArray[Any]) -> npt.NDArray[Any]: ) +def _rotation_angle_rad(R: np.ndarray) -> float: + """Axis-angle magnitude of a 3x3 rotation matrix. + + AZ-922 cheirality gate primitive. Uses ``θ = arccos((trace(R) - 1) / 2)``; + the ``arccos`` argument is clipped to ``[-1, +1]`` to defend against + floating-point drift on near-identity rotations (where ``trace`` can + exceed 3.0 by ~1e-15) and on the 180° antipode (where it can dip + below -1.0 by the same amount). The returned angle is always in + ``[0, π]``. + """ + trace = float(np.trace(R[:3, :3])) + cos_theta = max(-1.0, min(1.0, (trace - 1.0) / 2.0)) + return float(math.acos(cos_theta)) + + def _intrinsics_3x3(calibration: CameraCalibration) -> np.ndarray: """Pull the 3x3 intrinsics matrix from a CameraCalibration DTO. @@ -208,6 +237,7 @@ class KltRansacStrategy(FacadeSpine): *, fdr_client: FdrClient, clock: Clock | None = None, + altitude_provider: AltitudeProvider | None = None, ) -> None: c1_block = config.components["c1_vio"] if c1_block.strategy != _STRATEGY_LABEL: @@ -227,6 +257,12 @@ class KltRansacStrategy(FacadeSpine): self._feature_threshold: int = self._cfg.min_features_for_pose self._producer_id: str = _PRODUCER_ID self._strategy_label: str = _STRATEGY_LABEL + # AZ-919: plumbing only — stored on the instance but not yet consumed. + # The scale-recovery math that consumes this lands in AZ-920; + # the degraded-mode signal when the provider yields ``None`` + # lands in AZ-921. Default ``None`` preserves AC-3 bug-for-bug + # compatibility with strategies that have not been re-wired. + self._altitude_provider: AltitudeProvider | None = altitude_provider # Per-frame state. self._prev_gray: np.ndarray | None = None @@ -383,20 +419,70 @@ class KltRansacStrategy(FacadeSpine): reason="recover_pose_no_solution", ) - # 7. Build SE(3) from (R, t). + # 6.5 AZ-922 — cheirality gate. ``cv2.recoverPose`` can return + # a 180°-flipped rotation (essential-matrix twisted-pair + # ambiguity) under low-parallax conditions. Reject any + # single-frame rotation that exceeds the RESTRICT-UAV-3 + # angular-velocity envelope. Routes through the same + # _pose_recovery_failed path as any other failed-pose case so + # AC-7 (LOST after N consecutive failures) is honoured. + R_3x3 = np.asarray(R, dtype=np.float64) + rotation_angle_rad = _rotation_angle_rad(R_3x3) + if rotation_angle_rad > self._cfg.max_frame_rotation_rad: + return self._pose_recovery_failed( + frame_id_str, + emitted_at_ns, + prior_feature_count=prior_feature_count, + reason=( + f"implausible_rotation_angle: " + f"{math.degrees(rotation_angle_rad):.1f}deg > " + f"{math.degrees(self._cfg.max_frame_rotation_rad):.1f}deg threshold" + ), + ) + + # 7. AZ-920 — recover metric scale for the otherwise-unit + # translation. Operates on the final inlier set (AZ-282 + + # findEssentialMat's internal RANSAC). Only fires when the + # AltitudeProvider is wired by the AZ-919 composition root; + # when it is None (unit-test construction, legacy callers) the + # strategy preserves pre-AZ-920 behaviour bug-for-bug + # (AZ-919 AC-3 compatibility). + t_unit = np.asarray(t, dtype=np.float64).flatten() + t_metric = t_unit + scale_result: ScaleRecoveryResult | None = None + if self._altitude_provider is not None: + if em_mask is not None: + em_mask_flat = np.asarray(em_mask).flatten().astype(bool) + final_pts_prev = pts_prev[em_mask_flat] + final_pts_curr = pts_curr[em_mask_flat] + else: + final_pts_prev = pts_prev + final_pts_curr = pts_curr + agl_m = self._altitude_provider.agl_m(emitted_at_ns) + scale_result = recover_scale( + t_unit=t_unit, + pts_prev=final_pts_prev, + pts_curr=final_pts_curr, + intrinsics_3x3=K, + agl_m=agl_m, + ) + if scale_result.is_recoverable(): + t_metric = scale_result.scale_factor * t_unit + + # 8. Build SE(3) from (R, t_metric). pose_4x4 = np.eye(4, dtype=np.float64) pose_4x4[:3, :3] = np.asarray(R, dtype=np.float64) - pose_4x4[:3, 3] = np.asarray(t, dtype=np.float64).flatten() + pose_4x4[:3, 3] = t_metric pose = se3_from_4x4(pose_4x4) - # 8. Final inlier count for state classification + covariance. + # 9. Final inlier count for state classification + covariance. final_inlier_count = ( int(np.count_nonzero(em_mask)) if em_mask is not None else ransac_result.inlier_count ) if final_inlier_count < _ESSENTIAL_MATRIX_DOF: final_inlier_count = ransac_result.inlier_count - # 9. Estimate covariance from the AZ-282 median residual + + # 10. Estimate covariance from the AZ-282 median residual + # inlier-count penalty. Honest-covariance invariant (AC-9): no # client-side floor; the formula is residual_var / DOF + small # inlier-count term so the cov grows monotonically as inliers @@ -406,6 +492,24 @@ class KltRansacStrategy(FacadeSpine): inlier_count=final_inlier_count, ) + # 11. AZ-920/AZ-921 — when scale_quality == "metric", replace the + # position block of cov with the metric variance from GSD scale + # recovery so consumers reading pose_covariance_6x6 directly get + # honest m² units. When scale_quality is "direction_only" or + # "unknown" the cov stays in the geometry-natural units; the + # ESKF (AZ-921 add_vio) inflates its own R_meas based on the + # scale_quality flag, so it does not need the cov to encode the + # "do not trust me" message. + scale_quality: ScaleQuality = ( + scale_result.quality if scale_result is not None else "unknown" + ) + if scale_result is not None and scale_quality == "metric": + pos_var = max( + float(scale_result.scale_uncertainty_m) ** 2, + _MIN_POSITION_VARIANCE_M2, + ) + cov[0:3, 0:3] = np.eye(3, dtype=np.float64) * pos_var + # 10. Build VioOutput. fq = FeatureQuality( tracked=int(final_inlier_count), @@ -423,6 +527,7 @@ class KltRansacStrategy(FacadeSpine): imu_bias=self._latest_bias, feature_quality=fq, emitted_at_ns=emitted_at_ns, + scale_quality=scale_quality, ) # 11. Success path — reset lost counter, classify state. diff --git a/src/gps_denied_onboard/components/c5_state/eskf_baseline.py b/src/gps_denied_onboard/components/c5_state/eskf_baseline.py index d353bcc..c95639f 100644 --- a/src/gps_denied_onboard/components/c5_state/eskf_baseline.py +++ b/src/gps_denied_onboard/components/c5_state/eskf_baseline.py @@ -56,7 +56,6 @@ import numpy as np from numpy.linalg import LinAlgError from gps_denied_onboard._types.geo import LatLonAlt -from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard._types.state import ( EstimatorHealth, EstimatorOutput, @@ -64,22 +63,23 @@ from gps_denied_onboard._types.state import ( PoseSourceLabel, Quat, ) +from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard.components.c5_state._fallback_watcher import ( FallbackEngagementCallback, FallbackRecoveryCallback, FallbackSubscription, FallbackWatcher, ) -from gps_denied_onboard.components.c5_state._source_label_sm import ( - RejectionCallback, - RejectionSubscription, - SourceLabelStateMachine, -) from gps_denied_onboard.components.c5_state._orthorectifier import ( MidFlightTileWriter, Orthorectifier, OrthorectifierThresholds, ) +from gps_denied_onboard.components.c5_state._source_label_sm import ( + RejectionCallback, + RejectionSubscription, + SourceLabelStateMachine, +) from gps_denied_onboard.components.c5_state.config import C5StateConfig from gps_denied_onboard.components.c5_state.errors import ( EstimatorAlreadyStartedError, @@ -139,6 +139,17 @@ _PROC_SIGMA_BG: Final[float] = 0.0001 # Fall-back measurement noise when the input DTO has no covariance. _DEFAULT_MEAS_SIGMA: Final[float] = 0.1 +# AZ-921 — translation-block R_meas adaptation when the VIO scale signal +# is degraded. The unit-length translation from cv2.recoverPose carries +# direction information but no magnitude; we trust the direction at the +# DIRECTION-ONLY level so the rotation update can still constrain the +# state, then a follow-up scale-recovery event will reduce the position +# uncertainty. UNKNOWN is the "effectively skip the translation update" +# variance — large enough that the Kalman gain block is essentially +# zero, while still finite so the Mahalanobis innovation gate computes. +_DIRECTION_ONLY_TRANSLATION_SIGMA_M: Final[float] = 8.0 +_UNKNOWN_TRANSLATION_SIGMA_M: Final[float] = 1000.0 + # Filter divergence threshold (AC-9): Mahalanobis ``r^T S^{-1} r`` > # (10 sigma)^2 = 100 triggers EstimatorFatalError + AC-5.2 fallback. # The spec wording is "innovation exceeds 10x the measurement-covariance @@ -243,8 +254,8 @@ class EskfStateEstimator(StateEstimator): # historically did NOT cache the full DTO so we keep a # dedicated field rather than abuse ``_prev_vio_*``. self._orthorectifier: Orthorectifier | None = None - self._latest_nav_frame: "NavCameraFrame | None" = None - self._latest_vio: "VioOutput | None" = None + self._latest_nav_frame: NavCameraFrame | None = None + self._latest_vio: VioOutput | None = None self._log.debug( "c5.state.eskf_initialised", @@ -296,7 +307,7 @@ class EskfStateEstimator(StateEstimator): """ self._orthorectifier = orthorectifier - def set_latest_nav_frame(self, frame: "NavCameraFrame") -> None: + def set_latest_nav_frame(self, frame: NavCameraFrame) -> None: """AZ-389 — buffer the most recent nav-camera frame. Quiet no-op when no orthorectifier is attached. See @@ -541,6 +552,12 @@ class EskfStateEstimator(StateEstimator): H[0:3, _IDX_POS] = np.eye(3) H[3:6, _IDX_ROT] = prev_R # rotate body-frame perturbation back to world R_meas = _measurement_noise(vio.pose_covariance_6x6) + # AZ-921 — VIO scale-quality gating on the translation block of + # R_meas. The rotation block is left as the VIO emitted it + # regardless of scale_quality; only the magnitude of t is + # affected by monocular scale ambiguity, not its direction in + # body frame or the rotation matrix. + R_meas = _adapt_vio_r_meas(R_meas, vio.scale_quality) try: self._kalman_update(H, residual, R_meas, source="vio") @@ -991,7 +1008,7 @@ def create( wgs_converter: Any, fdr_client: Any, mid_flight_tile_writer: MidFlightTileWriter | None = None, - camera_calibration: "CameraCalibration | None" = None, + camera_calibration: CameraCalibration | None = None, flight_id: str | None = None, companion_id: str | None = None, ) -> tuple[StateEstimator, Any]: @@ -1086,6 +1103,30 @@ def _measurement_noise(covariance: Any | None) -> np.ndarray: return cov +def _adapt_vio_r_meas(r_meas: np.ndarray, scale_quality: str) -> np.ndarray: + """AZ-921 — translation-block adaptation per VIO scale-quality. + + When the VIO strategy reports it could not recover metric scale, + the translation block of R_meas is overwritten so the Kalman gain + in that block goes to (near) zero, leaving the rotation block + free to constrain the state. The Mahalanobis innovation gate + stays at 100 (10 sigma); large translation R_meas keeps the test + finite and well below the threshold even on large residuals. + + The function does not mutate the input array. + """ + if scale_quality == "metric": + return r_meas + if scale_quality == "direction_only": + translation_variance = _DIRECTION_ONLY_TRANSLATION_SIGMA_M**2 + else: + # "unknown" and any defensive default + translation_variance = _UNKNOWN_TRANSLATION_SIGMA_M**2 + adapted = r_meas.copy() + adapted[0:3, 0:3] = np.eye(3, dtype=np.float64) * translation_variance + return adapted + + def _skew(v: np.ndarray) -> np.ndarray: return np.array( [ diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_mavlink.py b/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_mavlink.py index 4ef396c..93cc36c 100644 --- a/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_mavlink.py +++ b/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_mavlink.py @@ -36,6 +36,7 @@ from gps_denied_onboard._types.geo import LatLonAlt from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard.components.c8_fc_adapter._subscription import SubscriptionBus from gps_denied_onboard.components.c8_fc_adapter._telemetry_rings import TelemetryRing +from gps_denied_onboard.helpers.imu_units import mavlink_imu_to_si_flu from gps_denied_onboard.logging import get_logger if TYPE_CHECKING: @@ -194,17 +195,26 @@ class PymavlinkInboundDecoder: # Per-type handlers def _handle_imu(self, msg: Any) -> bool: - # pymavlink RAW_IMU exposes raw signed-16 ticks; we forward as-is. - accel = (float(msg.xacc), float(msg.yacc), float(msg.zacc)) - gyro = (float(msg.xgyro), float(msg.ygyro), float(msg.zgyro)) + # RAW_IMU / SCALED_IMU2 wire format is mG accel + mrad/s gyro in + # FRD body; convert to SI/FLU at the boundary (AZ-918) so + # downstream consumers see the contract ImuTelemetrySample + # advertises. + accel_si_flu, gyro_si_flu = mavlink_imu_to_si_flu( + xacc=float(msg.xacc), + yacc=float(msg.yacc), + zacc=float(msg.zacc), + xgyro=float(msg.xgyro), + ygyro=float(msg.ygyro), + zgyro=float(msg.zgyro), + ) # `time_usec` is FC-side; we override with monotonic_ns at decode boundary # for the FdrTelemetryFrame.received_at; the payload's ts_ns is the # FC sensor stamp (epoch us -> ns). sensor_ts_ns = int(msg.time_usec) * 1000 payload = ImuTelemetrySample( ts_ns=sensor_ts_ns, - accel_xyz=accel, - gyro_xyz=gyro, + accel_xyz=accel_si_flu, + gyro_xyz=gyro_si_flu, ) return self._dispatch(TelemetryKind.IMU_SAMPLE, payload, ring=self.imu_ring) diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/tlog_replay_adapter.py b/src/gps_denied_onboard/components/c8_fc_adapter/tlog_replay_adapter.py index 4f84b1d..71cb4e0 100644 --- a/src/gps_denied_onboard/components/c8_fc_adapter/tlog_replay_adapter.py +++ b/src/gps_denied_onboard/components/c8_fc_adapter/tlog_replay_adapter.py @@ -81,6 +81,7 @@ from gps_denied_onboard.components.c8_fc_adapter.errors import ( ) from gps_denied_onboard.components.c8_fc_adapter.interface import MavlinkTransport from gps_denied_onboard.fdr_client.records import FdrRecord +from gps_denied_onboard.helpers.imu_units import mavlink_imu_to_si_flu from gps_denied_onboard.helpers.iso_timestamps import iso_ts_now from gps_denied_onboard.logging import get_logger @@ -716,20 +717,18 @@ class TlogReplayFcAdapter: def _handle_imu(self, msg: Any) -> bool: sensor_ts_ns = int(getattr(msg, "time_usec", 0)) * 1000 - accel = ( - float(msg.xacc), - float(msg.yacc), - float(msg.zacc), - ) - gyro = ( - float(msg.xgyro), - float(msg.ygyro), - float(msg.zgyro), + accel_si_flu, gyro_si_flu = mavlink_imu_to_si_flu( + xacc=float(msg.xacc), + yacc=float(msg.yacc), + zacc=float(msg.zacc), + xgyro=float(msg.xgyro), + ygyro=float(msg.ygyro), + zgyro=float(msg.zgyro), ) payload = ImuTelemetrySample( ts_ns=sensor_ts_ns, - accel_xyz=accel, - gyro_xyz=gyro, + accel_xyz=accel_si_flu, + gyro_xyz=gyro_si_flu, ) return self._dispatch(TelemetryKind.IMU_SAMPLE, payload, msg=msg) diff --git a/src/gps_denied_onboard/helpers/altitude_provider.py b/src/gps_denied_onboard/helpers/altitude_provider.py new file mode 100644 index 0000000..2a0c08e --- /dev/null +++ b/src/gps_denied_onboard/helpers/altitude_provider.py @@ -0,0 +1,93 @@ +"""Above-Ground-Level (AGL) provider abstraction for C1 VIO scale recovery. + +The monocular KLT/RANSAC strategy (AZ-334) recovers metric scale from the +unit-length translation that ``cv2.recoverPose`` emits by using ground +sample distance (GSD), which requires the current drone height above the +ground plane. The AGL signal lives in the C5 state estimator's nominal +position; this module wraps that read so the C1 strategy does not import +or hold a direct reference to the C5 estimator (which is built later in +the composition-root topological order). + +AZ-919 introduces only the interface + plumbing. The GSD scale-recovery +math lands in AZ-920, and the degraded-mode signal in AZ-921. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +from gps_denied_onboard._types.state import IsamState + +if TYPE_CHECKING: + from collections.abc import Callable + + from gps_denied_onboard.components.c5_state.eskf_baseline import ( + EskfStateEstimator, + ) + +__all__ = [ + "AltitudeProvider", + "EskfNominalAltitudeProvider", +] + + +@runtime_checkable +class AltitudeProvider(Protocol): + """Read the drone's current AGL height in metres, or ``None``. + + Producers MUST return ``None`` whenever the local-ENU origin has not + yet been anchored (pre cold-start) or the underlying estimator is + in :class:`IsamState.LOST`. Consumers MUST treat ``None`` as "no + reliable AGL" and fall back to a non-scale-recovery code path + (AZ-921 formalises that fallback as a degraded VIO output). + """ + + def agl_m(self, now_ns: int) -> float | None: # pragma: no cover - Protocol + """Return AGL in metres at ``now_ns`` (monotonic), or ``None``. + + ``now_ns`` is the same monotonic timebase used by the C1 strategy + for ``VioOutput.emitted_at_ns``. It is currently advisory — the + ESKF impl does not interpolate — but the parameter is in the + Protocol so future implementations (e.g. an LPF-smoothed AGL or + a DEM-aware provider) can interpolate or extrapolate without a + breaking change. + """ + ... + + +class EskfNominalAltitudeProvider: + """Concrete :class:`AltitudeProvider` backed by the C5 ESKF estimator. + + Reads AGL as the Z component of the ESKF nominal-position vector in + local-ENU. The takeoff origin is anchored at local-ENU ``(0, 0, 0)`` + when ``set_takeoff_origin`` lands, so ``nominal_pos_z`` IS the AGL + once the origin has been set — no separate cold-start-altitude + subtraction is needed. + + The estimator instance is supplied through a callable rather than + held directly because the composition root builds C1 (where this + provider is wired) before C5. The callable closes over the + composition root's mutable ``constructed`` dict and resolves the + estimator at every ``agl_m`` call, which is the same time the C1 + strategy actually consumes the AGL signal (well after the topo + order has built C5). + """ + + def __init__( + self, + estimator_supplier: Callable[[], EskfStateEstimator | None], + ) -> None: + self._estimator_supplier = estimator_supplier + + def agl_m(self, now_ns: int) -> float | None: + estimator = self._estimator_supplier() + if estimator is None: + return None + if getattr(estimator, "_takeoff_origin_set", None) is None: + return None + if getattr(estimator, "_isam2_state", None) == IsamState.LOST: + return None + nominal_pos = getattr(estimator, "_nominal_pos", None) + if nominal_pos is None: + return None + return float(nominal_pos[2]) diff --git a/src/gps_denied_onboard/helpers/imu_units.py b/src/gps_denied_onboard/helpers/imu_units.py new file mode 100644 index 0000000..0b85789 --- /dev/null +++ b/src/gps_denied_onboard/helpers/imu_units.py @@ -0,0 +1,76 @@ +"""MAVLink IMU unit + frame conversion to the nav-side SI/FLU contract. + +The MAVLink ``RAW_IMU`` and ``SCALED_IMU2`` / ``SCALED_IMU3`` messages +ship accelerometer values as **milli-g** (mG, where ``1 mG = 9.80665e-3 +m/s²``) and gyroscope values as **milli-radians per second** (mrad/s). +The body frame is **FRD** (X-forward, Y-right, Z-down). + +The downstream consumers (``c5_state.eskf_baseline``, the +GTSAM-backed ``helpers.imu_preintegrator``) expect: + +* Accelerometer in **m/s²**. +* Gyroscope in **rad/s**. +* Body frame **FLU / Z-up** so the body Z axis is parallel to the + ENU world Z axis at identity rotation. The ``c5_state`` + stationary-attitude unit test pins this with a specific-force + vector of ``(0, 0, +9.80665)``. + +This module is the single source of truth for that conversion. +Every adapter that constructs an ``ImuSample`` or +``ImuTelemetrySample`` from raw MAVLink fields MUST route through +``mavlink_imu_to_si_flu`` (AZ-918). + +The conversion is intentionally pure: no allocations beyond the +returned tuples, no logging, no clock access. Cheap enough to call +once per IMU sample at 100 Hz on the Tier-2 (Jetson) hot path. +""" + +from __future__ import annotations + +from typing import Final + +__all__ = [ + "MG_TO_M_S2", + "MRAD_S_TO_RAD_S", + "mavlink_imu_to_si_flu", +] + +MG_TO_M_S2: Final[float] = 9.80665e-3 +MRAD_S_TO_RAD_S: Final[float] = 1.0e-3 + + +def mavlink_imu_to_si_flu( + *, + xacc: float, + yacc: float, + zacc: float, + xgyro: float, + ygyro: float, + zgyro: float, +) -> tuple[tuple[float, float, float], tuple[float, float, float]]: + """Convert MAVLink ``RAW_IMU``/``SCALED_IMU*`` fields to SI/FLU. + + Args: + xacc, yacc, zacc: MAVLink accelerometer fields in mG (FRD body). + xgyro, ygyro, zgyro: MAVLink gyroscope fields in mrad/s (FRD body). + + Returns: + Two 3-tuples ``(accel_xyz_si_flu, gyro_xyz_si_flu)``: + + * Accel in m/s², body frame FLU (Z-up). + * Gyro in rad/s, body frame FLU (Z-up). + + The FRD→FLU transform is a Y- and Z-axis negation (X is + unchanged because both conventions point X forward). + """ + accel_xyz_si_flu = ( + xacc * MG_TO_M_S2, + -yacc * MG_TO_M_S2, + -zacc * MG_TO_M_S2, + ) + gyro_xyz_si_flu = ( + xgyro * MRAD_S_TO_RAD_S, + -ygyro * MRAD_S_TO_RAD_S, + -zgyro * MRAD_S_TO_RAD_S, + ) + return accel_xyz_si_flu, gyro_xyz_si_flu diff --git a/src/gps_denied_onboard/replay_input/csv_ground_truth.py b/src/gps_denied_onboard/replay_input/csv_ground_truth.py index bb01756..b4f4689 100644 --- a/src/gps_denied_onboard/replay_input/csv_ground_truth.py +++ b/src/gps_denied_onboard/replay_input/csv_ground_truth.py @@ -10,10 +10,14 @@ Schema (19 cols, header-required): * ``timestamp(ms)`` — FC-boot-relative ms, kept for traceability only. * ``Time`` — flight-relative seconds, the canonical clock. -* ``SCALED_IMU2.{x,y,z}{acc,gyro,mag}`` — 10 Hz IMU stream (raw mg / mrad/s - / mGauss per ArduPilot convention; preserved unchanged to match the - byte-for-byte semantics the tlog adapter uses for ``ImuSample.accel_xyz`` - and ``gyro_xyz``). +* ``SCALED_IMU2.{x,y,z}{acc,gyro,mag}`` — 10 Hz IMU stream. CSV columns + preserve the MAVLink wire format (mG accel, mrad/s gyro, mGauss mag, + FRD body frame). The parser converts accel and gyro to SI units in the + FLU body frame via + :func:`gps_denied_onboard.helpers.imu_units.mavlink_imu_to_si_flu` + before constructing each :class:`ImuSample` (AZ-918) so the downstream + C5 ESKF + ``imu_preintegrator`` see the contract they were built for. + Magnetometer columns are read but currently not consumed. * ``GLOBAL_POSITION_INT.{lat,lon,alt,relative_alt,vx,vy,vz,hdg}`` — 10 Hz GPS truth. ``lat``/``lon`` already in degrees (Derkachi dump format), ``alt`` in mm, ``vx``/``vy``/``vz`` in cm/s, ``hdg`` in cdeg. @@ -32,6 +36,7 @@ from pathlib import Path from typing import Final from gps_denied_onboard._types.nav import ImuSample +from gps_denied_onboard.helpers.imu_units import mavlink_imu_to_si_flu from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError __all__ = [ @@ -100,9 +105,11 @@ class CsvGroundTruth: Attributes: records: Time-ordered GPS fixes. imu_samples: Time-ordered IMU samples; one per CSV row (10 Hz). - Values preserve the raw mg / mrad/s units the tlog adapter - also passes through so C1 / C5 see identical numeric shapes - regardless of input source. + Values are normalised to the :class:`ImuSample` contract: + accel in m/s², gyro in rad/s, body frame FLU. The raw CSV + columns ship in MAVLink wire format (mG / mrad/s, FRD); + :func:`mavlink_imu_to_si_flu` applies the conversion + inside :func:`_parse_imu`. source: Discriminator label echoed into the cold-start FDR event (``"GLOBAL_POSITION_INT_CSV"`` for the Derkachi CSV). """ @@ -204,17 +211,15 @@ def _parse_imu( row_idx: int, ts_ns: int, ) -> ImuSample: - accel = ( - _parse_float(row, "SCALED_IMU2.xacc", path, row_idx), - _parse_float(row, "SCALED_IMU2.yacc", path, row_idx), - _parse_float(row, "SCALED_IMU2.zacc", path, row_idx), + accel_si_flu, gyro_si_flu = mavlink_imu_to_si_flu( + xacc=_parse_float(row, "SCALED_IMU2.xacc", path, row_idx), + yacc=_parse_float(row, "SCALED_IMU2.yacc", path, row_idx), + zacc=_parse_float(row, "SCALED_IMU2.zacc", path, row_idx), + xgyro=_parse_float(row, "SCALED_IMU2.xgyro", path, row_idx), + ygyro=_parse_float(row, "SCALED_IMU2.ygyro", path, row_idx), + zgyro=_parse_float(row, "SCALED_IMU2.zgyro", path, row_idx), ) - gyro = ( - _parse_float(row, "SCALED_IMU2.xgyro", path, row_idx), - _parse_float(row, "SCALED_IMU2.ygyro", path, row_idx), - _parse_float(row, "SCALED_IMU2.zgyro", path, row_idx), - ) - return ImuSample(ts_ns=ts_ns, accel_xyz=accel, gyro_xyz=gyro) + return ImuSample(ts_ns=ts_ns, accel_xyz=accel_si_flu, gyro_xyz=gyro_si_flu) def _parse_gps( diff --git a/src/gps_denied_onboard/runtime_root/__init__.py b/src/gps_denied_onboard/runtime_root/__init__.py index fff7629..eca4827 100644 --- a/src/gps_denied_onboard/runtime_root/__init__.py +++ b/src/gps_denied_onboard/runtime_root/__init__.py @@ -802,6 +802,7 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int: EstimatorDegradedError, EstimatorFatalError, ) + from gps_denied_onboard.helpers.imu_units import mavlink_imu_to_si_flu from gps_denied_onboard.logging import get_logger from gps_denied_onboard.replay_input.csv_ground_truth import ( load_csv_ground_truth, @@ -993,9 +994,10 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int: Branches on ``using_csv`` so the closure stays a single definition. Both branches respect the same ``pending_imu`` buffer + ``imu_anchor_ns`` / ``imu_eof`` state and produce - :class:`ImuSample` instances with identical numeric semantics - — the tlog branch matches :meth:`TlogReplayFcAdapter._handle_imu` - for byte-for-byte compatibility with the legacy path. + :class:`ImuSample` instances in SI/FLU per the ImuSample + contract — the tlog branch routes raw MAVLink fields through + :func:`mavlink_imu_to_si_flu` so it matches the CSV branch + and :meth:`TlogReplayFcAdapter._handle_imu` (AZ-918). """ nonlocal imu_anchor_ns, imu_eof, csv_imu_idx while not imu_eof: @@ -1018,10 +1020,18 @@ def _run_replay_loop(config: Config, runtime: RuntimeRoot) -> int: ts_ns = int(getattr(msg, "time_usec", 0)) * 1000 if ts_ns == 0: continue + accel_si_flu, gyro_si_flu = mavlink_imu_to_si_flu( + xacc=float(msg.xacc), + yacc=float(msg.yacc), + zacc=float(msg.zacc), + xgyro=float(msg.xgyro), + ygyro=float(msg.ygyro), + zgyro=float(msg.zgyro), + ) sample = ImuSample( ts_ns=ts_ns, - accel_xyz=(float(msg.xacc), float(msg.yacc), float(msg.zacc)), - gyro_xyz=(float(msg.xgyro), float(msg.ygyro), float(msg.zgyro)), + accel_xyz=accel_si_flu, + gyro_xyz=gyro_si_flu, ) if imu_anchor_ns is None: imu_anchor_ns = sample.ts_ns diff --git a/src/gps_denied_onboard/runtime_root/airborne_bootstrap.py b/src/gps_denied_onboard/runtime_root/airborne_bootstrap.py index 67e9189..8c92400 100644 --- a/src/gps_denied_onboard/runtime_root/airborne_bootstrap.py +++ b/src/gps_denied_onboard/runtime_root/airborne_bootstrap.py @@ -58,6 +58,7 @@ from typing import TYPE_CHECKING, Any, Final from gps_denied_onboard._types.calibration import CameraCalibration from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard.fdr_client.client import make_fdr_client +from gps_denied_onboard.helpers.altitude_provider import EskfNominalAltitudeProvider from gps_denied_onboard.helpers.feature_extractor import OpenCvOrbExtractor from gps_denied_onboard.helpers.imu_preintegrator import ( ImuPreintegrator, @@ -305,7 +306,18 @@ def _require(constructed: Mapping[str, Any], key: str, component_slug: str) -> A def _c1_vio_wrapper(config: Config, constructed: Mapping[str, Any]) -> Any: fdr_client = _require(constructed, "c13_fdr", "c1_vio") - return build_vio_strategy(config, fdr_client=fdr_client) + # AZ-919: AGL is read from the C5 ESKF estimator. C5 is built AFTER C1 + # in the topological order, so the provider holds a callable that + # resolves the estimator from ``constructed`` at call time (i.e. when + # the strategy actually consumes AGL during ``process_frame``). + altitude_provider = EskfNominalAltitudeProvider( + estimator_supplier=lambda: constructed.get("c5_state"), + ) + return build_vio_strategy( + config, + fdr_client=fdr_client, + altitude_provider=altitude_provider, + ) def _c2_vpr_wrapper(config: Config, constructed: Mapping[str, Any]) -> Any: diff --git a/src/gps_denied_onboard/runtime_root/vio_factory.py b/src/gps_denied_onboard/runtime_root/vio_factory.py index 6b6d0cb..89bbdcd 100644 --- a/src/gps_denied_onboard/runtime_root/vio_factory.py +++ b/src/gps_denied_onboard/runtime_root/vio_factory.py @@ -14,6 +14,7 @@ verifiable via ``sys.modules``). from __future__ import annotations +import inspect import os from typing import TYPE_CHECKING @@ -26,6 +27,7 @@ if TYPE_CHECKING: ) from gps_denied_onboard.components.c13_fdr import FdrClient from gps_denied_onboard.config.schema import Config + from gps_denied_onboard.helpers.altitude_provider import AltitudeProvider __all__ = ["build_vio_strategy"] @@ -74,6 +76,7 @@ def build_vio_strategy( config: "Config", *, fdr_client: "FdrClient", + altitude_provider: "AltitudeProvider | None" = None, ) -> "VioStrategy": """Construct the :class:`VioStrategy` impl selected by config. @@ -82,11 +85,18 @@ def build_vio_strategy( :class:`StrategyNotAvailableError` BEFORE any import. 3. Lazily imports the concrete strategy module. 4. Constructs and returns the strategy instance, passing - ``config`` and ``fdr_client``. + ``config`` and ``fdr_client`` and (if the strategy's + ``__init__`` accepts it) ``altitude_provider`` — AZ-919. Raises :class:`StrategyNotAvailableError` when the compile-time flag is OFF (canonical Tier-0 path) or when the concrete strategy module has not been built yet (AZ-332 / AZ-333 / AZ-334 pending). + + The ``altitude_provider`` kwarg is introspected against the + selected strategy's ``__init__`` signature so OKVIS2 / VINS-Mono + do not need to grow a parameter they do not yet consume; the + KLT/RANSAC strategy accepts it (per AZ-919 AC-3) and will use it + for GSD-based scale recovery in AZ-920. """ block = _c1_config(config) strategy = block.strategy @@ -111,4 +121,7 @@ def build_vio_strategy( "yet (AZ-332 / AZ-333 / AZ-334 pending)." ) from exc strategy_cls = getattr(module, class_name) - return strategy_cls(config, fdr_client=fdr_client) + kwargs: dict[str, object] = {"fdr_client": fdr_client} + if "altitude_provider" in inspect.signature(strategy_cls).parameters: + kwargs["altitude_provider"] = altitude_provider + return strategy_cls(config, **kwargs) diff --git a/tests/unit/c1_vio/test_az919_klt_ransac_altitude_provider.py b/tests/unit/c1_vio/test_az919_klt_ransac_altitude_provider.py new file mode 100644 index 0000000..8c2c7eb --- /dev/null +++ b/tests/unit/c1_vio/test_az919_klt_ransac_altitude_provider.py @@ -0,0 +1,70 @@ +"""AZ-919 — `KltRansacStrategy` accepts the `altitude_provider` kwarg. + +AZ-919 is plumbing only: the strategy must accept and store the provider +without yet consuming it. The math that consumes it lands in AZ-920; +the degraded-mode behaviour lands in AZ-921. Until then, the strategy +MUST behave identically to today when ``altitude_provider`` is ``None`` +(AC-3 bug-for-bug compatibility). +""" + +from __future__ import annotations + +from gps_denied_onboard.components.c1_vio import C1VioConfig, KltRansacConfig +from gps_denied_onboard.components.c1_vio.klt_ransac import KltRansacStrategy +from gps_denied_onboard.config.schema import Config, RuntimeConfig +from gps_denied_onboard.fdr_client.client import FdrClient +from gps_denied_onboard.helpers.altitude_provider import ( + AltitudeProvider, + EskfNominalAltitudeProvider, +) + + +def _config() -> Config: + c1 = C1VioConfig( + strategy="klt_ransac", + klt_ransac=KltRansacConfig(), + lost_frame_threshold=3, + ) + return Config.with_blocks(c1_vio=c1, runtime=RuntimeConfig()) + + +def _fdr_client() -> FdrClient: + return FdrClient(producer_id="test.az919", capacity=8, _emit_diag_log=False) + + +def test_default_altitude_provider_is_none() -> None: + strategy = KltRansacStrategy(_config(), fdr_client=_fdr_client()) + + assert strategy._altitude_provider is None + + +def test_explicit_altitude_provider_is_stored() -> None: + provider: AltitudeProvider = EskfNominalAltitudeProvider( + estimator_supplier=lambda: None, + ) + + strategy = KltRansacStrategy( + _config(), + fdr_client=_fdr_client(), + altitude_provider=provider, + ) + + assert strategy._altitude_provider is provider + + +def test_altitude_provider_is_keyword_only() -> None: + # Arrange — positional pass of altitude_provider must fail because + # the kwarg is declared after ``*``. This pins the calling contract + # so future refactors cannot accidentally make it positional. + fdr_client = _fdr_client() + provider = EskfNominalAltitudeProvider(estimator_supplier=lambda: None) + + # Act + Assert + try: + KltRansacStrategy(_config(), fdr_client, provider) # type: ignore[misc] + except TypeError: + return + raise AssertionError( + "KltRansacStrategy accepted altitude_provider positionally; " + "AZ-919 AC-3 requires the kwarg to be keyword-only." + ) diff --git a/tests/unit/c1_vio/test_az920_gsd_scale.py b/tests/unit/c1_vio/test_az920_gsd_scale.py new file mode 100644 index 0000000..d616ee9 --- /dev/null +++ b/tests/unit/c1_vio/test_az920_gsd_scale.py @@ -0,0 +1,254 @@ +"""AZ-920 — `recover_scale` pure-function tests. + +Pins the GSD scale-recovery contract that the KLT/RANSAC integration +in ``KltRansacStrategy.process_frame`` relies on. The integration +test is in ``test_az920_klt_ransac_scale_integration.py``; this file +covers the math in isolation. +""" + +from __future__ import annotations + +import math + +import numpy as np +import pytest + +from gps_denied_onboard.components.c1_vio._gsd_scale import ( + INFINITE_SCALE_UNCERTAINTY_M, + RELATIVE_DISPLACEMENT_NOISE, + ScaleRecoveryResult, + recover_scale, +) + +# ---------------------------------------------------------------------- +# Helpers + + +def _intrinsics(fx: float = 1680.4469) -> np.ndarray: + """Derkachi-grade nadir intrinsics by default (fx=fy, cx/cy at centre).""" + return np.array( + [ + [fx, 0.0, 960.0], + [0.0, fx, 540.0], + [0.0, 0.0, 1.0], + ], + dtype=np.float64, + ) + + +def _uniform_flow(n: int, dx_px: float, dy_px: float = 0.0) -> tuple[np.ndarray, np.ndarray]: + """Build Nx2 prev / curr point clouds with a uniform pixel-flow shift.""" + rng = np.random.default_rng(seed=2026) + base = rng.uniform(100.0, 1800.0, size=(n, 2)) + prev = base.copy() + curr = base + np.array([dx_px, dy_px], dtype=np.float64) + return prev, curr + + +# ---------------------------------------------------------------------- +# Happy path + + +def test_nadir_identity_rotation_with_known_gsd_yields_expected_scale() -> None: + # Arrange — at AGL=100 m, fx=1680 px → GSD ≈ 0.0595 m/px. A 20 px + # mean flow corresponds to ≈1.19 m of horizontal motion. t_unit + # along +X means t_horizontal = 1; scale_factor must equal 1.19. + t_unit = np.array([1.0, 0.0, 0.0], dtype=np.float64) + pts_prev, pts_curr = _uniform_flow(n=64, dx_px=20.0) + + # Act + result = recover_scale( + t_unit=t_unit, + pts_prev=pts_prev, + pts_curr=pts_curr, + intrinsics_3x3=_intrinsics(), + agl_m=100.0, + ) + + # Assert + assert isinstance(result, ScaleRecoveryResult) + expected_disp_m = 20.0 * 100.0 / 1680.4469 + assert result.scale_factor == pytest.approx(expected_disp_m, rel=1e-9) + assert result.scale_uncertainty_m == pytest.approx( + expected_disp_m * RELATIVE_DISPLACEMENT_NOISE, rel=1e-9 + ) + assert result.is_recoverable() + # AZ-921 — happy-path categorisation is "metric". + assert result.quality == "metric" + + +def test_diagonal_flow_uses_l2_norm_for_mean_magnitude() -> None: + # Arrange — (dx=3, dy=4) flow has L2 magnitude 5 px per feature. + t_unit = np.array([1.0, 0.0, 0.0], dtype=np.float64) + pts_prev, pts_curr = _uniform_flow(n=50, dx_px=3.0, dy_px=4.0) + + # Act + result = recover_scale( + t_unit=t_unit, + pts_prev=pts_prev, + pts_curr=pts_curr, + intrinsics_3x3=_intrinsics(), + agl_m=100.0, + ) + + # Assert + expected_disp_m = 5.0 * 100.0 / 1680.4469 + assert result.scale_factor == pytest.approx(expected_disp_m, rel=1e-9) + + +def test_off_axis_t_unit_uses_horizontal_component_magnitude() -> None: + # Arrange — t_unit splits 0.6/0.8 between camera-X and camera-Y; + # the horizontal magnitude is sqrt(0.36 + 0.64) = 1.0. The scale + # therefore stays the displacement / 1.0. + t_unit = np.array([0.6, 0.8, 0.0], dtype=np.float64) + pts_prev, pts_curr = _uniform_flow(n=40, dx_px=10.0) + + # Act + result = recover_scale( + t_unit=t_unit, + pts_prev=pts_prev, + pts_curr=pts_curr, + intrinsics_3x3=_intrinsics(), + agl_m=120.0, + ) + + # Assert + expected_disp_m = 10.0 * 120.0 / 1680.4469 + assert result.scale_factor == pytest.approx(expected_disp_m, rel=1e-9) + + +# ---------------------------------------------------------------------- +# Degenerate / fallback paths — each must produce inf uncertainty. + + +def test_agl_none_returns_infinite_uncertainty() -> None: + pts_prev, pts_curr = _uniform_flow(n=32, dx_px=20.0) + + result = recover_scale( + t_unit=np.array([1.0, 0.0, 0.0]), + pts_prev=pts_prev, + pts_curr=pts_curr, + intrinsics_3x3=_intrinsics(), + agl_m=None, + ) + + assert result.scale_factor == 0.0 + assert result.scale_uncertainty_m == INFINITE_SCALE_UNCERTAINTY_M + assert not result.is_recoverable() + # AZ-921 — AGL-unknown is "unknown", not "direction_only". + assert result.quality == "unknown" + + +def test_zero_or_negative_agl_returns_infinite_uncertainty() -> None: + pts_prev, pts_curr = _uniform_flow(n=32, dx_px=20.0) + + for bad_agl in (0.0, -1.0): + result = recover_scale( + t_unit=np.array([1.0, 0.0, 0.0]), + pts_prev=pts_prev, + pts_curr=pts_curr, + intrinsics_3x3=_intrinsics(), + agl_m=bad_agl, + ) + + assert math.isinf(result.scale_uncertainty_m), f"agl={bad_agl}" + + +def test_near_pure_vertical_t_unit_returns_infinite_uncertainty() -> None: + # Arrange — t along the optical axis (camera-Z) only; the GSD + # method cannot tie horizontal pixel flow to vertical motion. + pts_prev, pts_curr = _uniform_flow(n=40, dx_px=5.0) + + result = recover_scale( + t_unit=np.array([0.01, 0.01, 0.9998]), + pts_prev=pts_prev, + pts_curr=pts_curr, + intrinsics_3x3=_intrinsics(), + agl_m=100.0, + ) + + assert not result.is_recoverable() + # AZ-921 — near-vertical motion routes to direction_only: the + # direction of t_unit is still meaningful, only the magnitude is + # not derivable from horizontal pixel flow. + assert result.quality == "direction_only" + + +def test_zero_inlier_flow_returns_infinite_uncertainty() -> None: + # Arrange — stationary inliers (dx=dy=0). Cannot disambiguate the + # scale of a zero motion. + pts_prev, pts_curr = _uniform_flow(n=40, dx_px=0.0, dy_px=0.0) + + result = recover_scale( + t_unit=np.array([1.0, 0.0, 0.0]), + pts_prev=pts_prev, + pts_curr=pts_curr, + intrinsics_3x3=_intrinsics(), + agl_m=100.0, + ) + + assert not result.is_recoverable() + # AZ-921 — stale features routes to "unknown", since neither the + # magnitude nor the direction of motion is trustworthy. + assert result.quality == "unknown" + + +def test_empty_inlier_set_returns_infinite_uncertainty() -> None: + empty = np.zeros((0, 2), dtype=np.float64) + + result = recover_scale( + t_unit=np.array([1.0, 0.0, 0.0]), + pts_prev=empty, + pts_curr=empty, + intrinsics_3x3=_intrinsics(), + agl_m=100.0, + ) + + assert not result.is_recoverable() + + +def test_zero_focal_length_returns_infinite_uncertainty() -> None: + pts_prev, pts_curr = _uniform_flow(n=32, dx_px=20.0) + + result = recover_scale( + t_unit=np.array([1.0, 0.0, 0.0]), + pts_prev=pts_prev, + pts_curr=pts_curr, + intrinsics_3x3=_intrinsics(fx=0.0), + agl_m=100.0, + ) + + assert not result.is_recoverable() + + +# ---------------------------------------------------------------------- +# Numerical stability with a Derkachi-grade inlier set + + +def test_derkachi_grade_inlier_count_yields_stable_scale() -> None: + # Arrange — 120 inliers (representative of the Derkachi cruise + # observation) with a small per-feature noise around a uniform + # 30 px optical flow at 150 m AGL. + rng = np.random.default_rng(seed=271828) + base = rng.uniform(100.0, 1800.0, size=(120, 2)) + flow_mean_px = 30.0 + noise = rng.normal(0.0, 0.5, size=(120, 2)) + pts_prev = base + pts_curr = base + np.array([flow_mean_px, 0.0]) + noise + t_unit = np.array([1.0, 0.0, 0.0]) + agl_m = 150.0 + + # Act + result = recover_scale( + t_unit=t_unit, + pts_prev=pts_prev, + pts_curr=pts_curr, + intrinsics_3x3=_intrinsics(), + agl_m=agl_m, + ) + + # Assert — observed scale must be within 5 % of the analytic value + # at this noise level (sigma = 0.5 px over 120 inliers). + expected = flow_mean_px * agl_m / 1680.4469 + assert result.scale_factor == pytest.approx(expected, rel=5e-2) + assert result.is_recoverable() diff --git a/tests/unit/c1_vio/test_az920_klt_ransac_scale_integration.py b/tests/unit/c1_vio/test_az920_klt_ransac_scale_integration.py new file mode 100644 index 0000000..7b28ecb --- /dev/null +++ b/tests/unit/c1_vio/test_az920_klt_ransac_scale_integration.py @@ -0,0 +1,319 @@ +"""AZ-920 — `KltRansacStrategy` x `AltitudeProvider` integration coverage. + +Pins the contract that ``process_frame`` consumes the +:class:`AltitudeProvider` to recover metric scale on the +otherwise-unit translation that ``cv2.recoverPose`` emits. Pure +function math is covered in ``test_az920_gsd_scale.py``; this file +covers the wiring at the strategy boundary. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + +import numpy as np +import pytest + +from gps_denied_onboard._types.calibration import CameraCalibration +from gps_denied_onboard._types.nav import ( + ImuSample, + ImuWindow, + NavCameraFrame, +) +from gps_denied_onboard.components.c1_vio import C1VioConfig, KltRansacConfig +from gps_denied_onboard.components.c1_vio import klt_ransac as klt_ransac_module +from gps_denied_onboard.components.c1_vio._gsd_scale import RELATIVE_DISPLACEMENT_NOISE +from gps_denied_onboard.components.c1_vio.klt_ransac import ( + _MIN_POSITION_VARIANCE_M2, + KltRansacStrategy, +) +from gps_denied_onboard.config.schema import Config, RuntimeConfig +from gps_denied_onboard.fdr_client.client import FdrClient +from gps_denied_onboard.helpers.altitude_provider import AltitudeProvider +from gps_denied_onboard.helpers.ransac_filter import RansacResult + +# ---------------------------------------------------------------------- +# Test scaffolding + + +class _ConstantAgl(AltitudeProvider): + """Returns a fixed AGL on every call.""" + + def __init__(self, agl_m: float | None) -> None: + self._agl_m = agl_m + + def agl_m(self, now_ns: int) -> float | None: + return self._agl_m + + +def _calibration(focal_length: float = 1680.0) -> CameraCalibration: + return CameraCalibration( + camera_id="khp20s30-test", + intrinsics_3x3=np.array( + [ + [focal_length, 0.0, 960.0], + [0.0, focal_length, 540.0], + [0.0, 0.0, 1.0], + ], + dtype=np.float64, + ), + distortion=np.zeros(5, dtype=np.float64), + body_to_camera_se3=np.eye(4, dtype=np.float64), + acquisition_method="test_az920", + ) + + +def _frame(idx: int) -> NavCameraFrame: + rng = np.random.default_rng(seed=idx) + image = (rng.integers(0, 255, size=(240, 240), dtype=np.int16)).astype(np.uint8) + return NavCameraFrame( + frame_id=idx, + timestamp=datetime.fromtimestamp(idx * 0.1, tz=timezone.utc), + image=image, + camera_calibration_id="khp20s30-test", + ) + + +def _imu_window(frame_idx: int) -> ImuWindow: + ts_start_ns = 1_000_000_000 + frame_idx * 100_000_000 + samples = tuple( + ImuSample( + ts_ns=ts_start_ns + i * 5_000_000, + accel_xyz=(0.0, 0.0, 9.81), + gyro_xyz=(0.0, 0.0, 0.0), + ) + for i in range(3) + ) + return ImuWindow( + samples=samples, + ts_start_ns=samples[0].ts_ns, + ts_end_ns=samples[-1].ts_ns, + ) + + +def _config() -> Config: + return Config.with_blocks( + c1_vio=C1VioConfig( + strategy="klt_ransac", + klt_ransac=KltRansacConfig(), + lost_frame_threshold=3, + ), + runtime=RuntimeConfig(), + ) + + +def _patch_pose_recovery( + monkeypatch: pytest.MonkeyPatch, + *, + flow_px: float, + inlier_count: int = 40, + t_unit: tuple[float, float, float] = (1.0, 0.0, 0.0), +) -> None: + """Force the geometry stack to a controlled success path. + + Builds an inlier set with a uniform ``flow_px`` pixel shift in the + +x direction; ``cv2.findEssentialMat`` returns identity rotation + + an all-inlier mask; ``cv2.recoverPose`` returns identity rotation + + a unit-length translation in ``t_unit``. + """ + rng = np.random.default_rng(seed=314159) + base = rng.uniform(50.0, 1800.0, size=(inlier_count, 2)) + inliers = np.column_stack([base, base + np.array([flow_px, 0.0])]) + mask = np.ones((inlier_count, 1), dtype=np.uint8) + + def _fake_filter(_corr: np.ndarray, _thresh: float, _min: int) -> RansacResult: + return RansacResult( + inlier_correspondences=inliers, + inlier_count=inlier_count, + outlier_count=0, + median_residual_px=0.5, + ) + + def _fake_find_essential(*_a: Any, **_k: Any) -> tuple[np.ndarray, np.ndarray]: + return np.eye(3, dtype=np.float64), mask + + def _fake_recover_pose(*_a: Any, **_k: Any) -> tuple[int, np.ndarray, np.ndarray, np.ndarray]: + R = np.eye(3, dtype=np.float64) + t_col = np.asarray(t_unit, dtype=np.float64).reshape(3, 1) + return inlier_count, R, t_col, mask + + monkeypatch.setattr( + klt_ransac_module.RansacFilter, + "filter_correspondences", + staticmethod(_fake_filter), + ) + monkeypatch.setattr(klt_ransac_module.cv2, "findEssentialMat", _fake_find_essential) + monkeypatch.setattr(klt_ransac_module.cv2, "recoverPose", _fake_recover_pose) + + +def _fdr_client() -> FdrClient: + return FdrClient(producer_id="test.az920", capacity=16, _emit_diag_log=False) + + +# ---------------------------------------------------------------------- +# AC-4 — translation gets metric scale when the provider yields valid AGL + + +def test_steady_state_frame_translation_is_metric_when_agl_known( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange — 20 px mean flow at 100 m AGL, fx=1680. + # Expected scale_factor = 20 * 100 / 1680 ≈ 1.190 m. The unit t is + # along +X, so the emitted translation is ≈ (1.19, 0, 0) m. + _patch_pose_recovery(monkeypatch, flow_px=20.0) + fdr_client = _fdr_client() + provider = _ConstantAgl(agl_m=100.0) + strategy = KltRansacStrategy( + _config(), fdr_client=fdr_client, altitude_provider=provider + ) + calibration = _calibration(focal_length=1680.0) + + # Act — first frame: INIT (identity pose). Second frame: scale recovery fires. + strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) + second = strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) + + # Assert + pose_matrix = np.asarray(second.relative_pose_T.matrix()) + translation = pose_matrix[:3, 3] + expected_x = 20.0 * 100.0 / 1680.0 + assert translation[0] == pytest.approx(expected_x, rel=1e-3) + # Y / Z components are zero because t_unit was pure +X. + assert translation[1] == pytest.approx(0.0, abs=1e-9) + assert translation[2] == pytest.approx(0.0, abs=1e-9) + # Translation magnitude is metric, not unit-length. + assert float(np.linalg.norm(translation)) > 0.5 # metres, not 1.0 px-scale + # AZ-921 — scale_quality must report "metric" on the happy path. + assert second.scale_quality == "metric" + + +# ---------------------------------------------------------------------- +# AC-4 — position covariance is in m² when AGL is known + + +def test_position_covariance_block_is_metric_when_agl_known( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange + _patch_pose_recovery(monkeypatch, flow_px=20.0) + provider = _ConstantAgl(agl_m=100.0) + strategy = KltRansacStrategy( + _config(), fdr_client=_fdr_client(), altitude_provider=provider + ) + calibration = _calibration(focal_length=1680.0) + + # Act + strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) + second = strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) + + # Assert + cov = np.asarray(second.pose_covariance_6x6) + expected_disp_m = 20.0 * 100.0 / 1680.0 + expected_pos_var = max( + (expected_disp_m * RELATIVE_DISPLACEMENT_NOISE) ** 2, + _MIN_POSITION_VARIANCE_M2, + ) + pos_block = cov[0:3, 0:3] + assert np.allclose(pos_block, np.eye(3) * expected_pos_var, atol=1e-12) + + +# ---------------------------------------------------------------------- +# AZ-921 AC-7 — when AGL is unknown the strategy emits scale_quality +# = "unknown" and leaves the cov in geometry-natural units (the ESKF +# inflates R_meas; see test_az921_eskf_add_vio_scale_quality.py). + + +def test_scale_quality_unknown_when_agl_provider_returns_none( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange + _patch_pose_recovery(monkeypatch, flow_px=20.0) + provider = _ConstantAgl(agl_m=None) + strategy = KltRansacStrategy( + _config(), fdr_client=_fdr_client(), altitude_provider=provider + ) + calibration = _calibration(focal_length=1680.0) + + # Act + strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) + second = strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) + + # Assert + assert second.scale_quality == "unknown" + # Translation stays unit-length when scale could not be recovered. + pose_matrix = np.asarray(second.relative_pose_T.matrix()) + translation = pose_matrix[:3, 3] + assert float(np.linalg.norm(translation)) == pytest.approx(1.0, rel=1e-6) + # The position block of cov is NOT overridden by KLT/RANSAC any + # more (AZ-921 AC-7). The ESKF's R_meas adaptation provides the + # "do not trust" signal on the consumer side. + cov = np.asarray(second.pose_covariance_6x6) + pos_block = cov[0:3, 0:3] + assert float(pos_block[0, 0]) < 1.0 # NOT the AZ-920 1e6 sentinel + + +# ---------------------------------------------------------------------- +# AZ-921 AC-6 — direction_only branch reachable on near-vertical motion + + +def test_scale_quality_direction_only_on_near_pure_vertical_motion( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange — realistic horizontal flow (20 px) but cv2.recoverPose + # returns a t_unit pointing essentially along the optical axis + # (the climb/descent edge case). Horizontal magnitude is far below + # the _MIN_T_HORIZONTAL_MAGNITUDE = 0.05 threshold. + _patch_pose_recovery( + monkeypatch, + flow_px=20.0, + t_unit=(0.001, 0.001, 0.99999), + ) + provider = _ConstantAgl(agl_m=100.0) + strategy = KltRansacStrategy( + _config(), fdr_client=_fdr_client(), altitude_provider=provider + ) + calibration = _calibration(focal_length=1680.0) + + # Act + strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) + second = strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) + + # Assert + assert second.scale_quality == "direction_only" + # Translation magnitude stays at the cv2.recoverPose output — i.e. + # the t_unit input — because scale was not recovered. The mocked + # t_unit is (0.001, 0.001, 0.99999), L2 norm ≈ 0.9999. The test + # asserts the strategy did NOT multiply by a real metric scale. + pose_matrix = np.asarray(second.relative_pose_T.matrix()) + translation = pose_matrix[:3, 3] + assert float(np.linalg.norm(translation)) == pytest.approx(0.9999, abs=1e-3) + + +# ---------------------------------------------------------------------- +# AZ-919 AC-3 regression — provider=None keeps pre-AZ-920 behaviour + + +def test_translation_unchanged_and_cov_untouched_when_provider_is_none( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange — same mock as before, but the strategy is built WITHOUT + # an altitude_provider (the pre-AZ-919 / legacy unit-test path). + _patch_pose_recovery(monkeypatch, flow_px=20.0) + strategy = KltRansacStrategy(_config(), fdr_client=_fdr_client()) + calibration = _calibration(focal_length=1680.0) + + # Act + strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) + second = strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) + + # Assert — translation stays at the unit-length t_unit (1, 0, 0). + pose_matrix = np.asarray(second.relative_pose_T.matrix()) + translation = pose_matrix[:3, 3] + assert float(np.linalg.norm(translation)) == pytest.approx(1.0, rel=1e-6) + # The position block of cov is the same scalar * I as the rotation + # block — no AZ-920 override fired. + cov = np.asarray(second.pose_covariance_6x6) + assert np.allclose(cov[0:3, 0:3], cov[3:6, 3:6], atol=1e-12) + # AZ-921 — without an altitude provider, the strategy defaults + # scale_quality to "unknown" so consumers know to inflate R_meas. + assert second.scale_quality == "unknown" diff --git a/tests/unit/c1_vio/test_az922_cheirality_gate.py b/tests/unit/c1_vio/test_az922_cheirality_gate.py new file mode 100644 index 0000000..1265acd --- /dev/null +++ b/tests/unit/c1_vio/test_az922_cheirality_gate.py @@ -0,0 +1,354 @@ +"""AZ-922 — cheirality / rotation-plausibility gate in KLT/RANSAC. + +Covers AC-1 (axis-angle helper math), AC-2/AC-3 (gate threshold + routing +through `_pose_recovery_failed`), and AC-4 (boundary + AC-7 lost-frame +escalation). AC-5 (Jetson e2e Derkachi past frame 5) is gated separately +via the Tier-2 harness — not in unit scope. +""" + +from __future__ import annotations + +import math +from datetime import datetime, timezone +from typing import Any + +import numpy as np +import pytest + +from gps_denied_onboard._types.calibration import CameraCalibration +from gps_denied_onboard._types.nav import ( + ImuSample, + ImuWindow, + NavCameraFrame, +) +from gps_denied_onboard.components.c1_vio import C1VioConfig, KltRansacConfig +from gps_denied_onboard.components.c1_vio import klt_ransac as klt_ransac_module +from gps_denied_onboard.components.c1_vio.errors import ( + VioFatalError, + VioInitializingError, +) +from gps_denied_onboard.components.c1_vio.klt_ransac import ( + KltRansacStrategy, + _rotation_angle_rad, +) +from gps_denied_onboard.config.schema import Config, RuntimeConfig +from gps_denied_onboard.fdr_client.client import FdrClient +from gps_denied_onboard.helpers.ransac_filter import RansacResult + +# ---------------------------------------------------------------------- +# AC-1 — pure helper math +# ---------------------------------------------------------------------- + + +def test_rotation_angle_identity_is_zero() -> None: + # Assert + assert _rotation_angle_rad(np.eye(3, dtype=np.float64)) == pytest.approx(0.0, abs=1e-12) + + +def test_rotation_angle_180_about_x_is_pi() -> None: + # Arrange — R rotates 180° about the body-X axis: trace = 1 + (-1) + (-1) = -1. + R = np.array( + [ + [1.0, 0.0, 0.0], + [0.0, -1.0, 0.0], + [0.0, 0.0, -1.0], + ], + dtype=np.float64, + ) + + # Assert + assert _rotation_angle_rad(R) == pytest.approx(math.pi, abs=1e-9) + + +def test_rotation_angle_general_diagonal_recovers_axis_angle_magnitude() -> None: + # Arrange — 30° rotation about body-Z: trace = cos(30)+cos(30)+1. + theta = math.radians(30.0) + R = np.array( + [ + [math.cos(theta), -math.sin(theta), 0.0], + [math.sin(theta), math.cos(theta), 0.0], + [0.0, 0.0, 1.0], + ], + dtype=np.float64, + ) + + # Assert + assert _rotation_angle_rad(R) == pytest.approx(theta, abs=1e-9) + + +def test_rotation_angle_arccos_argument_clipped_against_float_drift() -> None: + # Arrange — synthesise an R whose trace is *just* above 3.0 (impossible + # in exact arithmetic but achievable via float drift). Without the + # arccos clip this would yield NaN; with the clip it must return 0. + R = np.eye(3, dtype=np.float64) + np.full((3, 3), 1e-15, dtype=np.float64) + + # Assert + assert _rotation_angle_rad(R) == pytest.approx(0.0, abs=1e-6) + assert math.isfinite(_rotation_angle_rad(R)) + + +def test_rotation_angle_uses_jetson_frame_5_diagonal_signature() -> None: + # Arrange — exact R diagonal observed at Derkachi frame 5 (the + # divergence point). Trace = -0.848 - 0.639 + 0.487 = -1.0. + R = np.diag([-0.848, -0.639, 0.487]).astype(np.float64) + + # Assert — recover the 180° rotation the ESKF traceback implied. + assert _rotation_angle_rad(R) == pytest.approx(math.pi, abs=1e-9) + + +# ---------------------------------------------------------------------- +# Strategy-level scaffolding (mirrors test_az920_klt_ransac_scale_integration.py) +# ---------------------------------------------------------------------- + + +def _calibration() -> CameraCalibration: + return CameraCalibration( + camera_id="khp20s30-test", + intrinsics_3x3=np.array( + [ + [1680.0, 0.0, 960.0], + [0.0, 1680.0, 540.0], + [0.0, 0.0, 1.0], + ], + dtype=np.float64, + ), + distortion=np.zeros(5, dtype=np.float64), + body_to_camera_se3=np.eye(4, dtype=np.float64), + acquisition_method="test_az922", + ) + + +def _frame(idx: int) -> NavCameraFrame: + rng = np.random.default_rng(seed=idx) + image = (rng.integers(0, 255, size=(240, 240), dtype=np.int16)).astype(np.uint8) + return NavCameraFrame( + frame_id=idx, + timestamp=datetime.fromtimestamp(idx * 0.1, tz=timezone.utc), + image=image, + camera_calibration_id="khp20s30-test", + ) + + +def _imu_window(frame_idx: int) -> ImuWindow: + ts_start_ns = 1_000_000_000 + frame_idx * 100_000_000 + samples = tuple( + ImuSample( + ts_ns=ts_start_ns + i * 5_000_000, + accel_xyz=(0.0, 0.0, 9.81), + gyro_xyz=(0.0, 0.0, 0.0), + ) + for i in range(3) + ) + return ImuWindow( + samples=samples, + ts_start_ns=samples[0].ts_ns, + ts_end_ns=samples[-1].ts_ns, + ) + + +def _config(lost_frame_threshold: int = 3) -> Config: + return Config.with_blocks( + c1_vio=C1VioConfig( + strategy="klt_ransac", + klt_ransac=KltRansacConfig(), + lost_frame_threshold=lost_frame_threshold, + ), + runtime=RuntimeConfig(), + ) + + +def _patch_pose_recovery_with_rotation( + monkeypatch: pytest.MonkeyPatch, + *, + R_3x3: np.ndarray, + inlier_count: int = 40, +) -> None: + """Force the geometry stack to a controlled path with a chosen R. + + Mirrors the AZ-920 test helper but takes the rotation matrix as a + parameter so AZ-922 can exercise the cheirality gate directly. + """ + rng = np.random.default_rng(seed=271828) + base = rng.uniform(50.0, 1800.0, size=(inlier_count, 2)) + inliers = np.column_stack([base, base + np.array([5.0, 0.0])]) + mask = np.ones((inlier_count, 1), dtype=np.uint8) + + def _fake_filter(_corr: np.ndarray, _thresh: float, _min: int) -> RansacResult: + return RansacResult( + inlier_correspondences=inliers, + inlier_count=inlier_count, + outlier_count=0, + median_residual_px=0.5, + ) + + def _fake_find_essential(*_a: Any, **_k: Any) -> tuple[np.ndarray, np.ndarray]: + return np.eye(3, dtype=np.float64), mask + + def _fake_recover_pose(*_a: Any, **_k: Any) -> tuple[int, np.ndarray, np.ndarray, np.ndarray]: + t_col = np.array([[1.0], [0.0], [0.0]], dtype=np.float64) + return inlier_count, R_3x3.astype(np.float64), t_col, mask + + monkeypatch.setattr( + klt_ransac_module.RansacFilter, + "filter_correspondences", + staticmethod(_fake_filter), + ) + monkeypatch.setattr(klt_ransac_module.cv2, "findEssentialMat", _fake_find_essential) + monkeypatch.setattr(klt_ransac_module.cv2, "recoverPose", _fake_recover_pose) + + +def _fdr_client() -> FdrClient: + return FdrClient(producer_id="test.az922", capacity=16, _emit_diag_log=False) + + +def _r_about_z(theta_rad: float) -> np.ndarray: + return np.array( + [ + [math.cos(theta_rad), -math.sin(theta_rad), 0.0], + [math.sin(theta_rad), math.cos(theta_rad), 0.0], + [0.0, 0.0, 1.0], + ], + dtype=np.float64, + ) + + +# ---------------------------------------------------------------------- +# AC-2/AC-3 — gate threshold + routing through _pose_recovery_failed +# ---------------------------------------------------------------------- + + +def test_identity_rotation_passes_gate(monkeypatch: pytest.MonkeyPatch) -> None: + # Arrange + _patch_pose_recovery_with_rotation(monkeypatch, R_3x3=np.eye(3)) + strategy = KltRansacStrategy(_config(), fdr_client=_fdr_client()) + calibration = _calibration() + + # Act + strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) + second = strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) + + # Assert + assert second is not None + assert second.frame_id == "2" + + +def test_five_degree_rotation_passes_gate(monkeypatch: pytest.MonkeyPatch) -> None: + # Arrange — 5° rotation, well below the 30° default threshold. + _patch_pose_recovery_with_rotation(monkeypatch, R_3x3=_r_about_z(math.radians(5.0))) + strategy = KltRansacStrategy(_config(), fdr_client=_fdr_client()) + calibration = _calibration() + + # Act + strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) + second = strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) + + # Assert + assert second is not None + + +def test_sixty_degree_rotation_rejected_by_gate(monkeypatch: pytest.MonkeyPatch) -> None: + # Arrange — 60° rotation exceeds the 30° default threshold. + _patch_pose_recovery_with_rotation(monkeypatch, R_3x3=_r_about_z(math.radians(60.0))) + strategy = KltRansacStrategy(_config(), fdr_client=_fdr_client()) + calibration = _calibration() + + # Act + Assert + strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) + with pytest.raises(VioInitializingError, match="implausible_rotation_angle"): + strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) + + +def test_180_degree_rotation_rejected_by_gate(monkeypatch: pytest.MonkeyPatch) -> None: + # Arrange — synthesise the Jetson-observed frame-5 R signature. + R_180 = np.diag([-0.848, -0.639, 0.487]).astype(np.float64) + _patch_pose_recovery_with_rotation(monkeypatch, R_3x3=R_180) + strategy = KltRansacStrategy(_config(), fdr_client=_fdr_client()) + calibration = _calibration() + + # Act + Assert + strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) + with pytest.raises(VioInitializingError, match="implausible_rotation_angle"): + strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) + + +def test_threshold_boundary_just_above_rejects(monkeypatch: pytest.MonkeyPatch) -> None: + # Arrange — 30.1° vs 30° threshold. + _patch_pose_recovery_with_rotation( + monkeypatch, R_3x3=_r_about_z(math.radians(30.1)) + ) + strategy = KltRansacStrategy(_config(), fdr_client=_fdr_client()) + calibration = _calibration() + + # Act + Assert + strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) + with pytest.raises(VioInitializingError, match="implausible_rotation_angle"): + strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) + + +def test_threshold_boundary_just_below_passes(monkeypatch: pytest.MonkeyPatch) -> None: + # Arrange — 29.9° just below 30° threshold. + _patch_pose_recovery_with_rotation( + monkeypatch, R_3x3=_r_about_z(math.radians(29.9)) + ) + strategy = KltRansacStrategy(_config(), fdr_client=_fdr_client()) + calibration = _calibration() + + # Act + strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) + second = strategy.process_frame(_frame(idx=2), _imu_window(2), calibration) + + # Assert + assert second is not None + + +# ---------------------------------------------------------------------- +# AC-4 (AC-7 path) — consecutive rejections escalate to VioFatalError +# ---------------------------------------------------------------------- + + +def test_consecutive_rejections_eventually_raise_vio_fatal( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange — every frame yields a 180° flip; lost_frame_threshold=3 + # means the 4th frame in this stream (the 3rd consecutive failure) + # crosses the LOST gate. + R_180 = np.diag([-1.0, -1.0, 1.0]).astype(np.float64) # 180° about Z + _patch_pose_recovery_with_rotation(monkeypatch, R_3x3=R_180) + strategy = KltRansacStrategy( + _config(lost_frame_threshold=3), fdr_client=_fdr_client() + ) + calibration = _calibration() + + # Act — frame 1 seeds INIT; frames 2-4 hit the gate and tick lost. + strategy.process_frame(_frame(idx=1), _imu_window(1), calibration) + for idx in range(2, 4): + with pytest.raises(VioInitializingError, match="implausible_rotation_angle"): + strategy.process_frame(_frame(idx=idx), _imu_window(idx), calibration) + # The 4th consecutive failed frame trips the LOST/VioFatalError gate. + with pytest.raises(VioFatalError, match="exhausted lost-frame budget"): + strategy.process_frame(_frame(idx=4), _imu_window(4), calibration) + + +# ---------------------------------------------------------------------- +# Config validation — defends the (0, π] range +# ---------------------------------------------------------------------- + + +def test_config_rejects_zero_or_negative_threshold() -> None: + # Act + Assert + with pytest.raises(Exception, match="max_frame_rotation_rad must be in"): + KltRansacConfig(max_frame_rotation_rad=0.0) + + +def test_config_rejects_threshold_above_pi() -> None: + # Act + Assert + with pytest.raises(Exception, match="max_frame_rotation_rad must be in"): + KltRansacConfig(max_frame_rotation_rad=math.pi + 0.01) + + +def test_config_accepts_pi_exactly() -> None: + # Act — the inclusive upper bound. No rotation can exceed π anyway. + cfg = KltRansacConfig(max_frame_rotation_rad=math.pi) + + # Assert + assert cfg.max_frame_rotation_rad == pytest.approx(math.pi) diff --git a/tests/unit/c1_vio/test_klt_ransac_strategy.py b/tests/unit/c1_vio/test_klt_ransac_strategy.py index 08948b8..f28465f 100644 --- a/tests/unit/c1_vio/test_klt_ransac_strategy.py +++ b/tests/unit/c1_vio/test_klt_ransac_strategy.py @@ -53,7 +53,6 @@ from gps_denied_onboard.config.schema import Config, RuntimeConfig from gps_denied_onboard.fdr_client.client import FdrClient from gps_denied_onboard.fdr_client.records import FdrRecord - # ---------------------------------------------------------------------- # Helpers — keep boilerplate out of the AC test bodies. @@ -528,6 +527,19 @@ def test_ac6_low_inlier_count_emits_degraded_with_monotonic_covariance( calibration, ) + # AZ-922 — pin cv2.recoverPose to identity rotation for frame 2 too. + # Real cv2 on this synthetic input returns a near-180° rotation that + # the cheirality gate would reject; this test asserts FACADE state + # classification on inlier loss, not cv2's geometry on synthetic data. + def _identity_recover_pose( + *_a: Any, **_k: Any + ) -> tuple[int, np.ndarray, np.ndarray, np.ndarray]: + R = np.eye(3, dtype=np.float64) + t = np.array([[0.01], [0.0], [0.0]], dtype=np.float64) + return 1, R, t, np.ones((1, 1), dtype=np.uint8) + + monkeypatch.setattr(klt_ransac_module.cv2, "recoverPose", _identity_recover_pose) + # Frame 2 — first successful pose recovery (TRACKING). out_tracking = strategy.process_frame( _frame(idx=2, image=_synthetic_frame_image(seed=31, shift_x=3)), diff --git a/tests/unit/c5_state/test_az921_eskf_add_vio_scale_quality.py b/tests/unit/c5_state/test_az921_eskf_add_vio_scale_quality.py new file mode 100644 index 0000000..2cd9a86 --- /dev/null +++ b/tests/unit/c5_state/test_az921_eskf_add_vio_scale_quality.py @@ -0,0 +1,187 @@ +"""AZ-921 AC-5 — `EskfStateEstimator.add_vio` translation-R_meas +adaptation per `VioOutput.scale_quality`. + +Two layers of coverage: + +1. **Unit** — `_adapt_vio_r_meas` is the pure helper that owns the + block-wise adaptation. Three tests pin its three branches. +2. **Integration** — `EskfStateEstimator.add_vio` is run end-to-end + with each `scale_quality` branch and the resulting nominal position + delta is compared. ``"metric"`` produces a large position update + (the Kalman gain trusts the measurement); ``"unknown"`` produces an + essentially-zero position update (the gain block is ~0). +""" + +from __future__ import annotations + +import dataclasses +from typing import Any +from unittest import mock + +import numpy as np +import pytest + +from gps_denied_onboard._types.nav import ( + FeatureQuality, + ImuBias, + ScaleQuality, + VioOutput, +) +from gps_denied_onboard.components.c5_state import C5StateConfig +from gps_denied_onboard.components.c5_state.eskf_baseline import ( + _DIRECTION_ONLY_TRANSLATION_SIGMA_M, + _UNKNOWN_TRANSLATION_SIGMA_M, + EskfStateEstimator, + _adapt_vio_r_meas, +) +from gps_denied_onboard.config import load_config +from gps_denied_onboard.config.schema import Config + +# ---------------------------------------------------------------------- +# _adapt_vio_r_meas helper — unit coverage. + + +def test_adapt_vio_r_meas_metric_returns_input_unchanged() -> None: + # Arrange + r_meas = np.diag([0.04, 0.04, 0.04, 0.0001, 0.0001, 0.0001]) + + # Act + adapted = _adapt_vio_r_meas(r_meas, "metric") + + # Assert + assert np.array_equal(adapted, r_meas) + # Identity is acceptable too — the caller MUST NOT mutate either way. + assert np.array_equal(r_meas, np.diag([0.04, 0.04, 0.04, 0.0001, 0.0001, 0.0001])) + + +def test_adapt_vio_r_meas_direction_only_overrides_translation_block() -> None: + # Arrange + r_meas = np.diag([0.04, 0.04, 0.04, 0.0001, 0.0001, 0.0001]) + expected_translation_var = _DIRECTION_ONLY_TRANSLATION_SIGMA_M**2 + + # Act + adapted = _adapt_vio_r_meas(r_meas, "direction_only") + + # Assert — translation block becomes diag(sigma^2 * I), rotation + # block left as-is. + assert np.allclose(adapted[0:3, 0:3], np.eye(3) * expected_translation_var) + assert np.allclose(adapted[3:6, 3:6], r_meas[3:6, 3:6]) + # And the input was NOT mutated. + assert r_meas[0, 0] == pytest.approx(0.04) + + +def test_adapt_vio_r_meas_unknown_inflates_translation_far_beyond_direction_only() -> None: + # Arrange + r_meas = np.diag([0.04, 0.04, 0.04, 0.0001, 0.0001, 0.0001]) + expected_translation_var = _UNKNOWN_TRANSLATION_SIGMA_M**2 + + # Act + adapted = _adapt_vio_r_meas(r_meas, "unknown") + + # Assert + assert np.allclose(adapted[0:3, 0:3], np.eye(3) * expected_translation_var) + assert np.allclose(adapted[3:6, 3:6], r_meas[3:6, 3:6]) + # And the input was NOT mutated. + assert r_meas[0, 0] == pytest.approx(0.04) + + +def test_adapt_vio_r_meas_unknown_translation_var_is_strictly_larger() -> None: + # Sanity: an "unknown" measurement must be treated as strictly less + # informative than a "direction_only" one (otherwise the three-tier + # split is meaningless). + assert _UNKNOWN_TRANSLATION_SIGMA_M > _DIRECTION_ONLY_TRANSLATION_SIGMA_M + + +# ---------------------------------------------------------------------- +# add_vio integration — end-to-end position-update sensitivity per +# scale_quality. + + +def _build_config(**state_overrides: Any) -> Config: + cfg = load_config(env={}, paths=(), require_env=False) + new_state = dataclasses.replace(C5StateConfig(strategy="eskf"), **state_overrides) + components = dict(cfg.components or {}) + components["c5_state"] = new_state + return dataclasses.replace(cfg, components=components) + + +def _make_estimator() -> EskfStateEstimator: + return EskfStateEstimator( + _build_config(), + imu_preintegrator=mock.MagicMock(), + se3_utils=mock.MagicMock(), + wgs_converter=mock.MagicMock(), + fdr_client=mock.MagicMock(), + ) + + +def _identity_pose() -> np.ndarray: + return np.eye(4, dtype=np.float64) + + +def _pose_translated(x: float) -> np.ndarray: + p = np.eye(4, dtype=np.float64) + p[0, 3] = x + return p + + +_ZERO_BIAS = ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)) +_NEUTRAL_FQ = FeatureQuality(tracked=20, new=2, lost=1, mean_parallax=5.0, mre_px=1.0) + + +def _vio( + frame_id: int, + ts_ns: int, + pose: np.ndarray, + scale_quality: ScaleQuality, +) -> VioOutput: + import gtsam + return VioOutput( + frame_id=str(frame_id), + relative_pose_T=gtsam.Pose3(pose), + pose_covariance_6x6=np.eye(6) * 0.01, + imu_bias=_ZERO_BIAS, + feature_quality=_NEUTRAL_FQ, + emitted_at_ns=int(ts_ns), + scale_quality=scale_quality, + ) + + +def _position_delta_after_vio_pair(scale_quality: ScaleQuality) -> float: + # Arrange — two VIO frames; first seeds, second reports a +1 m + # forward translation. Returns the magnitude of the nominal-pos + # update the ESKF applied. With scale_quality="metric" the Kalman + # gain in the position block is ~1 and the update is ~1 m. + estimator = _make_estimator() + estimator.add_vio(_vio(1, 1_000_000_000, _identity_pose(), scale_quality)) + pos_before = estimator._nominal_pos.copy() + estimator.add_vio(_vio(2, 1_100_000_000, _pose_translated(1.0), scale_quality)) + + # Act / Assert wrapper — returns the metric (m) the caller compares. + return float(np.linalg.norm(estimator._nominal_pos - pos_before)) + + +def test_metric_scale_quality_applies_full_position_update() -> None: + # Assert — with metric scale, the ESKF trusts the +1 m measurement + # almost completely (R_meas[0:3, 0:3] = 0.01 m^2 = 10 cm sigma). + metric_delta = _position_delta_after_vio_pair("metric") + assert metric_delta > 0.5 + + +def test_unknown_scale_quality_yields_near_zero_position_update() -> None: + # Assert — with unknown scale, the ESKF inflates R_meas to (1000 m)^2 + # so the Kalman gain block is ~0 and the +1 m measurement does + # essentially nothing to the position state. + unknown_delta = _position_delta_after_vio_pair("unknown") + assert unknown_delta < 1.0e-3 + + +def test_direction_only_scale_quality_yields_partial_update_between_metric_and_unknown() -> None: + # Assert — direction_only sits between the two extremes. The + # specific magnitude depends on the prior covariance + chosen + # sigma, but the ordering is what matters. + metric_delta = _position_delta_after_vio_pair("metric") + direction_only_delta = _position_delta_after_vio_pair("direction_only") + unknown_delta = _position_delta_after_vio_pair("unknown") + + assert unknown_delta < direction_only_delta < metric_delta diff --git a/tests/unit/c8_fc_adapter/test_az391_inbound_subscription.py b/tests/unit/c8_fc_adapter/test_az391_inbound_subscription.py index b794fae..a45eb36 100644 --- a/tests/unit/c8_fc_adapter/test_az391_inbound_subscription.py +++ b/tests/unit/c8_fc_adapter/test_az391_inbound_subscription.py @@ -86,8 +86,19 @@ def test_ac1_ap_raw_imu_decode() -> None: frame = received[0] assert frame.kind is TelemetryKind.IMU_SAMPLE assert isinstance(frame.payload, ImuTelemetrySample) - assert frame.payload.accel_xyz == (10.0, 20.0, -981.0) - assert frame.payload.gyro_xyz == (1.0, 2.0, 3.0) + # AZ-918: raw MAVLink (xacc=10, yacc=20, zacc=-981) mG FRD → + # SI/FLU m/s² via mavlink_imu_to_si_flu (negate Y, negate Z, *9.80665e-3). + # zacc raw is -981 mG; FRD→FLU negates Z, giving +981 mG = +9.62 m/s². + assert frame.payload.accel_xyz == pytest.approx(( + 10 * 9.80665e-3, + -20 * 9.80665e-3, + 981 * 9.80665e-3, + )) + assert frame.payload.gyro_xyz == pytest.approx(( + 1 * 1.0e-3, + -2 * 1.0e-3, + -3 * 1.0e-3, + )) # received_at is monotonic_ns at decode boundary (positive, non-zero) assert frame.received_at > 0 @@ -621,7 +632,12 @@ def test_ac10_corrupt_frame_does_not_kill_decoder( assert len(err_logs) == 1 imu_frames = [f for f in received if f.kind is TelemetryKind.IMU_SAMPLE] assert len(imu_frames) == 1 - assert imu_frames[0].payload.accel_xyz == (1.0, 2.0, 3.0) + # AZ-918: raw MAVLink (xacc=1, yacc=2, zacc=3) mG FRD → SI/FLU. + assert imu_frames[0].payload.accel_xyz == pytest.approx(( + 1 * 9.80665e-3, + -2 * 9.80665e-3, + -3 * 9.80665e-3, + )) # ---------------------------------------------------------------------- diff --git a/tests/unit/c8_fc_adapter/test_az399_tlog_replay_adapter.py b/tests/unit/c8_fc_adapter/test_az399_tlog_replay_adapter.py index 82dc3a8..c280630 100644 --- a/tests/unit/c8_fc_adapter/test_az399_tlog_replay_adapter.py +++ b/tests/unit/c8_fc_adapter/test_az399_tlog_replay_adapter.py @@ -12,7 +12,6 @@ faked via :class:`_FakeTlog` so tests run without the real C extension. from __future__ import annotations -import os import time from pathlib import Path from types import SimpleNamespace @@ -50,7 +49,6 @@ from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import ( TlogReplayFcAdapter, ) - # ---------------------------------------------------------------------- # Fixtures + helpers @@ -379,7 +377,19 @@ def test_ac2_ap_dialect_frame_mapping_in_tlog_order( TelemetryKind.MAV_STATE, ] assert isinstance(received[0].payload, ImuTelemetrySample) - assert received[0].payload.accel_xyz == (10.0, 20.0, -981.0) + # AZ-918: raw MAVLink (xacc=10, yacc=20, zacc=-981) mG FRD → + # SI/FLU m/s² via mavlink_imu_to_si_flu (negate Y, negate Z, *9.80665e-3). + # zacc raw is -981 mG; FRD→FLU negates Z, giving +981 mG = +9.62 m/s². + assert received[0].payload.accel_xyz == pytest.approx(( + 10 * 9.80665e-3, + -20 * 9.80665e-3, + 981 * 9.80665e-3, + )) + assert received[0].payload.gyro_xyz == pytest.approx(( + 1 * 1.0e-3, + -2 * 1.0e-3, + -3 * 1.0e-3, + )) assert isinstance(received[1].payload, AttitudeSample) assert received[1].payload.yaw_rad == pytest.approx(1.5) assert isinstance(received[2].payload, GpsHealth) diff --git a/tests/unit/replay_input/test_csv_ground_truth.py b/tests/unit/replay_input/test_csv_ground_truth.py index ec434ea..8ebb531 100644 --- a/tests/unit/replay_input/test_csv_ground_truth.py +++ b/tests/unit/replay_input/test_csv_ground_truth.py @@ -26,7 +26,6 @@ from gps_denied_onboard.replay_input.csv_ground_truth import ( ) from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError - _DERKACHI_CSV: Path = ( Path(__file__).resolve().parents[3] / "_docs" @@ -163,11 +162,13 @@ def test_paired_imu_and_gps_share_clock(tmp_path: Path) -> None: def test_gps_unit_conversion(tmp_path: Path) -> None: - # Arrange — values exercise the deg/mm/cm-s/cdeg conversions. + # Arrange — values exercise the deg/mm/cm-s/cdeg conversions on + # the GPS columns and the mG/mrad-s + FRD→FLU conversion on the + # IMU columns (AZ-918). header = _full_header() row = ",".join([ "0.0", "0.0", - "10", "-3", "-980", "50", "30", "-5", # IMU stays raw + "10", "-3", "-980", "50", "30", "-5", # IMU raw (mG/mrad·s⁻¹/FRD) "50.0809634", # lat already in degrees "36.1115442", # lon already in degrees "141290", # alt in mm → 141.290 m @@ -181,7 +182,7 @@ def test_gps_unit_conversion(tmp_path: Path) -> None: # Act gt = load_csv_ground_truth(csv) - # Assert + # Assert — GPS in SI / decimal-degrees. fix = gt.records[0] assert fix.lat_deg == pytest.approx(50.0809634) assert fix.lon_deg == pytest.approx(36.1115442) @@ -190,6 +191,21 @@ def test_gps_unit_conversion(tmp_path: Path) -> None: assert fix.vy_m_s == pytest.approx(6.0) assert fix.vz_m_s == pytest.approx(-0.88) assert fix.hdg_deg == pytest.approx(350.41) + # Assert — IMU converted to m/s² + rad/s, body frame FLU. + imu = gt.imu_samples[0] + # AZ-918: CSV ships MAVLink wire format (mG/mrad/s, FRD body); the + # parser routes through mavlink_imu_to_si_flu so consumers see SI/FLU. + # FRD→FLU negates Y and Z, so a raw -3 (yacc) / -980 (zacc) become +3 / +980. + assert imu.accel_xyz == pytest.approx(( + 10 * 9.80665e-3, + 3 * 9.80665e-3, + 980 * 9.80665e-3, + )) + assert imu.gyro_xyz == pytest.approx(( + 50 * 1.0e-3, + -30 * 1.0e-3, + 5 * 1.0e-3, + )) # --------------------------------------------------------------------- diff --git a/tests/unit/runtime_root/test_az919_altitude_provider_wiring.py b/tests/unit/runtime_root/test_az919_altitude_provider_wiring.py new file mode 100644 index 0000000..22f15fe --- /dev/null +++ b/tests/unit/runtime_root/test_az919_altitude_provider_wiring.py @@ -0,0 +1,95 @@ +"""AZ-919 — `_c1_vio_wrapper` builds and forwards an `EskfNominalAltitudeProvider`. + +Tests the composition-root seam: the AZ-919 plumbing is only useful if +the wrapper actually constructs the provider and passes it to +``build_vio_strategy``. The provider's supplier must close over the same +mutable ``constructed`` dict that the bootstrap loop populates so it +sees the C5 estimator after the topo order builds it. +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from gps_denied_onboard.config import Config +from gps_denied_onboard.helpers.altitude_provider import ( + AltitudeProvider, + EskfNominalAltitudeProvider, +) +from gps_denied_onboard.runtime_root import airborne_bootstrap +from gps_denied_onboard.runtime_root.airborne_bootstrap import _c1_vio_wrapper + + +def _minimal_config() -> Config: + return Config.with_blocks(c1_vio={"strategy": "klt_ransac"}) + + +def test_wrapper_forwards_altitude_provider_kwarg( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange — capture the kwargs handed to build_vio_strategy. + captured: dict[str, Any] = {} + + def _capture(_config: Config, **kwargs: Any) -> object: + captured.update(kwargs) + return object() + + monkeypatch.setattr(airborne_bootstrap, "build_vio_strategy", _capture) + constructed: dict[str, Any] = {"c13_fdr": object()} + + # Act + _c1_vio_wrapper(_minimal_config(), constructed) + + # Assert + assert "altitude_provider" in captured + assert isinstance(captured["altitude_provider"], EskfNominalAltitudeProvider) + assert isinstance(captured["altitude_provider"], AltitudeProvider) + + +def test_provider_supplier_resolves_c5_estimator_lazily( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange — capture the provider, then mutate the constructed dict + # after the wrapper returns to simulate C5 landing in the topo order. + captured: dict[str, Any] = {} + + def _capture(_config: Config, **kwargs: Any) -> object: + captured.update(kwargs) + return object() + + monkeypatch.setattr(airborne_bootstrap, "build_vio_strategy", _capture) + constructed: dict[str, Any] = {"c13_fdr": object()} + _c1_vio_wrapper(_minimal_config(), constructed) + provider: EskfNominalAltitudeProvider = captured["altitude_provider"] + + # Pre-C5 the supplier returns None. + pre_c5 = provider._estimator_supplier() + + # The bootstrap loop later assigns the c5_state slot. + sentinel_estimator = object() + constructed["c5_state"] = sentinel_estimator + post_c5 = provider._estimator_supplier() + + # Assert + assert pre_c5 is None + assert post_c5 is sentinel_estimator + + +def test_wrapper_still_requires_c13_fdr( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange — AZ-919 must not silently relax the existing AC-618-3 + # invariant: an absent c13_fdr key still raises AirborneBootstrapError. + monkeypatch.setattr( + airborne_bootstrap, + "build_vio_strategy", + lambda *_a, **_kw: object(), + ) + + # Act + Assert + with pytest.raises(airborne_bootstrap.AirborneBootstrapError) as exc: + _c1_vio_wrapper(_minimal_config(), constructed={}) + assert "c13_fdr" in str(exc.value) + assert "c1_vio" in str(exc.value) diff --git a/tests/unit/test_az918_imu_units.py b/tests/unit/test_az918_imu_units.py new file mode 100644 index 0000000..c5aa205 --- /dev/null +++ b/tests/unit/test_az918_imu_units.py @@ -0,0 +1,103 @@ +"""AZ-918 — `mavlink_imu_to_si_flu` unit + frame conversion. + +Pins the conversion contract so any future change to either the +constant or the body-frame transform is a deliberate, reviewed edit. + +The helper sits between every MAVLink-IMU adapter and the +``nav.ImuSample`` / ``fc.ImuTelemetrySample`` boundary, so a silent +regression here would break C5 ESKF and FDR consumers simultaneously. +""" + +from __future__ import annotations + +import math + +import pytest + +from gps_denied_onboard.helpers.imu_units import ( + MG_TO_M_S2, + MRAD_S_TO_RAD_S, + mavlink_imu_to_si_flu, +) + + +def test_constants_match_si_definition() -> None: + # Assert + assert MG_TO_M_S2 == pytest.approx(9.80665e-3) + assert MRAD_S_TO_RAD_S == pytest.approx(1.0e-3) + + +def test_stationary_frd_body_z_down_becomes_flu_body_z_up_one_g() -> None: + # Arrange — Stationary, level UAV: gravity vector in MAVLink FRD + # body frame points in +Z (down), so the measured specific force + # is -gravity = -Z. Magnitude is exactly one standard gravity. + raw_z_down_mg = -1000.0 + + # Act + accel_si_flu, gyro_si_flu = mavlink_imu_to_si_flu( + xacc=0.0, + yacc=0.0, + zacc=raw_z_down_mg, + xgyro=0.0, + ygyro=0.0, + zgyro=0.0, + ) + + # Assert — In ESKF/preintegrator FLU body the same specific force + # should appear as +Z = +9.80665 m/s² (matching the C5 ESKF unit + # test's stationary vector). + assert accel_si_flu == pytest.approx((0.0, 0.0, 9.80665)) + assert gyro_si_flu == pytest.approx((0.0, 0.0, 0.0)) + + +def test_frd_to_flu_negates_y_and_z_keeps_x() -> None: + # Arrange — distinguishable components per axis. + # Act + accel_si_flu, gyro_si_flu = mavlink_imu_to_si_flu( + xacc=100.0, yacc=200.0, zacc=-300.0, + xgyro=10.0, ygyro=20.0, zgyro=-30.0, + ) + + # Assert — X stays positive; Y and Z flip sign per the FRD→FLU + # body-frame transform. + assert accel_si_flu == pytest.approx(( + +100.0 * MG_TO_M_S2, + -200.0 * MG_TO_M_S2, + +300.0 * MG_TO_M_S2, + )) + assert gyro_si_flu == pytest.approx(( + +10.0 * MRAD_S_TO_RAD_S, + -20.0 * MRAD_S_TO_RAD_S, + +30.0 * MRAD_S_TO_RAD_S, + )) + + +def test_unit_magnitudes_match_first_csv_row_of_derkachi_fixture() -> None: + # Arrange — CSV row 0 of `data_imu.csv` (Derkachi fixture). UAV is + # in slow descent at ~0.88 m/s, near level, so |accel| should be + # close to 1 g. + raw_xacc, raw_yacc, raw_zacc = 21.0, -3.0, -984.0 + + # Act + accel_si_flu, _ = mavlink_imu_to_si_flu( + xacc=raw_xacc, yacc=raw_yacc, zacc=raw_zacc, + xgyro=0.0, ygyro=0.0, zgyro=0.0, + ) + + # Assert — magnitude is within 5% of one standard gravity (the + # body is descending, not perfectly stationary, so a tight bound + # is wrong; this test pins the order-of-magnitude only). + mag = math.sqrt(sum(c * c for c in accel_si_flu)) + assert 9.30 < mag < 10.30, f"|a|={mag:.3f} m/s² is not near 1 g" + + +def test_zero_input_returns_zero_output() -> None: + # Act + accel, gyro = mavlink_imu_to_si_flu( + xacc=0.0, yacc=0.0, zacc=0.0, + xgyro=0.0, ygyro=0.0, zgyro=0.0, + ) + + # Assert + assert accel == (0.0, 0.0, 0.0) + assert gyro == (0.0, 0.0, 0.0) diff --git a/tests/unit/test_az919_altitude_provider.py b/tests/unit/test_az919_altitude_provider.py new file mode 100644 index 0000000..f670b61 --- /dev/null +++ b/tests/unit/test_az919_altitude_provider.py @@ -0,0 +1,107 @@ +"""AZ-919 — `AltitudeProvider` Protocol + `EskfNominalAltitudeProvider` contract. + +Pins the gating rules that the GSD scale-recovery work (AZ-920) and the +degraded-mode signal (AZ-921) build on top of. +""" + +from __future__ import annotations + +from typing import Any + +import numpy as np + +from gps_denied_onboard._types.state import IsamState +from gps_denied_onboard.helpers.altitude_provider import ( + AltitudeProvider, + EskfNominalAltitudeProvider, +) + + +class _FakeEstimator: + """Minimal ESKF stand-in exposing only the attrs the provider reads.""" + + def __init__( + self, + *, + nominal_pos_z: float, + takeoff_origin_set: bool, + isam2_state: IsamState, + ) -> None: + self._nominal_pos = np.array([0.0, 0.0, nominal_pos_z], dtype=np.float64) + self._takeoff_origin_set: tuple[Any, float, float] | None = ( + ("origin-sentinel", 1.0, 1.0) if takeoff_origin_set else None + ) + self._isam2_state = isam2_state + + +def test_provider_is_runtime_checkable_protocol() -> None: + provider = EskfNominalAltitudeProvider(estimator_supplier=lambda: None) + + assert isinstance(provider, AltitudeProvider) + + +def test_returns_none_when_supplier_yields_none() -> None: + provider = EskfNominalAltitudeProvider(estimator_supplier=lambda: None) + + assert provider.agl_m(now_ns=1_000_000_000) is None + + +def test_returns_none_before_takeoff_origin_is_anchored() -> None: + # Arrange — origin not yet set, even though nominal_pos has a value. + estimator = _FakeEstimator( + nominal_pos_z=42.0, + takeoff_origin_set=False, + isam2_state=IsamState.INIT, + ) + provider = EskfNominalAltitudeProvider(estimator_supplier=lambda: estimator) + + assert provider.agl_m(now_ns=1_000_000_000) is None + + +def test_returns_nominal_pos_z_after_origin_is_anchored() -> None: + # Arrange + estimator = _FakeEstimator( + nominal_pos_z=83.5, + takeoff_origin_set=True, + isam2_state=IsamState.TRACKING, + ) + provider = EskfNominalAltitudeProvider(estimator_supplier=lambda: estimator) + + # Act + Assert + assert provider.agl_m(now_ns=1_000_000_000) == 83.5 + + +def test_returns_none_when_estimator_is_lost() -> None: + # Arrange — origin anchored and altitude is non-zero, but the + # filter has flipped to LOST; the AGL signal is no longer trustworthy. + estimator = _FakeEstimator( + nominal_pos_z=120.0, + takeoff_origin_set=True, + isam2_state=IsamState.LOST, + ) + provider = EskfNominalAltitudeProvider(estimator_supplier=lambda: estimator) + + assert provider.agl_m(now_ns=1_000_000_000) is None + + +def test_supplier_is_re_resolved_per_call() -> None: + # Arrange — the composition root builds C1 before C5, so the + # supplier must read from a mutable container at call time. + holder: dict[str, _FakeEstimator] = {} + provider = EskfNominalAltitudeProvider( + estimator_supplier=lambda: holder.get("c5_state"), + ) + + # Act — first call: C5 not yet built. + first = provider.agl_m(now_ns=1_000_000_000) + # Then C5 lands in the dict. + holder["c5_state"] = _FakeEstimator( + nominal_pos_z=37.0, + takeoff_origin_set=True, + isam2_state=IsamState.TRACKING, + ) + second = provider.agl_m(now_ns=2_000_000_000) + + # Assert + assert first is None + assert second == 37.0 diff --git a/tests/unit/test_az921_vio_output_scale_quality.py b/tests/unit/test_az921_vio_output_scale_quality.py new file mode 100644 index 0000000..473c92a --- /dev/null +++ b/tests/unit/test_az921_vio_output_scale_quality.py @@ -0,0 +1,91 @@ +"""AZ-921 AC-4 — VioOutput.scale_quality DTO contract.""" + +from __future__ import annotations + +import typing +from dataclasses import fields + +import numpy as np + +from gps_denied_onboard._types.nav import ( + FeatureQuality, + ImuBias, + ScaleQuality, + VioOutput, +) +from gps_denied_onboard.helpers.se3_utils import SE3, matrix_to_se3 + + +def _imu_bias() -> ImuBias: + return ImuBias( + accel_bias=(0.0, 0.0, 0.0), + gyro_bias=(0.0, 0.0, 0.0), + ) + + +def _feature_quality() -> FeatureQuality: + return FeatureQuality( + tracked=10, + new=2, + lost=1, + mean_parallax=1.0, + mre_px=0.5, + ) + + +def _identity_se3() -> SE3: + return matrix_to_se3(np.eye(4, dtype=np.float64)) + + +def test_vio_output_default_scale_quality_is_unknown_for_back_compat() -> None: + # Arrange / Act — constructor MUST work without scale_quality so + # legacy strategies (OKVIS2, VINS-Mono) that have not been updated + # for AZ-921 stay bug-for-bug compatible. + vio = VioOutput( + frame_id="frame-0", + relative_pose_T=_identity_se3(), + pose_covariance_6x6=np.eye(6, dtype=np.float64), + imu_bias=_imu_bias(), + feature_quality=_feature_quality(), + emitted_at_ns=1_000_000_000, + ) + + # Assert + assert vio.scale_quality == "unknown" + + +def test_vio_output_accepts_each_scale_quality_value() -> None: + # Arrange + accepted_values = ("metric", "direction_only", "unknown") + + # Act / Assert + for value in accepted_values: + vio = VioOutput( + frame_id="frame-0", + relative_pose_T=_identity_se3(), + pose_covariance_6x6=np.eye(6, dtype=np.float64), + imu_bias=_imu_bias(), + feature_quality=_feature_quality(), + emitted_at_ns=1_000_000_000, + scale_quality=value, + ) + assert vio.scale_quality == value + + +def test_scale_quality_literal_pins_the_three_documented_values() -> None: + # Arrange / Act — the Literal type from the typing module exposes + # its parameters via __args__, which lets the contract test pin + # the exact accepted-string set rather than relying on a string + # comparison alone. + args = typing.get_args(ScaleQuality) + + # Assert + assert set(args) == {"metric", "direction_only", "unknown"} + + +def test_vio_output_dataclass_exposes_scale_quality_field() -> None: + # Arrange / Act + field_names = {f.name for f in fields(VioOutput)} + + # Assert + assert "scale_quality" in field_names