diff --git a/_docs/02_tasks/todo/AZ-331_c1_vio_strategy_protocol.md b/_docs/02_tasks/done/AZ-331_c1_vio_strategy_protocol.md similarity index 100% rename from _docs/02_tasks/todo/AZ-331_c1_vio_strategy_protocol.md rename to _docs/02_tasks/done/AZ-331_c1_vio_strategy_protocol.md diff --git a/src/gps_denied_onboard/_types/nav.py b/src/gps_denied_onboard/_types/nav.py index b4c440f..b7e9b16 100644 --- a/src/gps_denied_onboard/_types/nav.py +++ b/src/gps_denied_onboard/_types/nav.py @@ -1,16 +1,25 @@ -"""Navigation-side DTOs: camera frames, IMU samples, attitude, FC flight state, GPS health. +"""Navigation-side DTOs: camera frames, IMU samples, attitude, VIO output. -These are type-only stubs created by AZ-263 (Bootstrap). Concrete field semantics are -defined in `_docs/02_document/architecture.md § 4` and the C1 / C5 / C8 component -specs. Concrete subclasses are owned by the components that emit them; downstream -consumers depend on the DTOs declared here. +Type-only stubs created by AZ-263 (Bootstrap) and extended by AZ-331 +(C1 VioStrategy contract freeze, v1.0.0). Concrete field semantics +are defined in ``_docs/02_document/architecture.md § 4`` and the C1 +/ C5 / C8 component specs. Concrete subclasses are owned by the +components that emit them; downstream consumers depend on the DTOs +declared here. Cross-component DTOs (``VioOutput``, ``VioHealth``) +live at this L1 layer per module-layout.md — components MUST NOT +import them from other components. """ from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime -from typing import Any +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + import numpy.typing as npt + + from gps_denied_onboard.helpers.se3_utils import SE3 @dataclass(frozen=True) @@ -75,3 +84,99 @@ class AttitudeWindow: # canonical shape uses enums + monotonic_ns timestamps; the old stubs # from AZ-263 used `str` + `datetime` and were never wired by any # production producer or consumer). + + +# ---------------------------------------------------------------------- +# AZ-331 — C1 VioStrategy contract DTOs (v1.0.0). + +from enum import Enum + + +class VioState(str, Enum): + """C1 VIO health state reported on every ``VioHealth`` snapshot.""" + + INIT = "init" + TRACKING = "tracking" + DEGRADED = "degraded" + LOST = "lost" + + +@dataclass(frozen=True) +class FeatureQuality: + """Per-frame feature-tracking diagnostics (C1 contract v1.0.0). + + Surfaced inside :class:`VioOutput`; consumed by C13 FDR + C5 fusion + for adaptive-gating decisions. + """ + + tracked: int + new: int + lost: int + mean_parallax: float + mre_px: float + + +@dataclass(frozen=True) +class WarmStartPose: + """Hint passed to ``VioStrategy.reset_to_warm_start`` after F8 reboot. + + ``body_T_world`` is a :class:`gtsam.Pose3` (= ``SE3``); the warm-start + + F8 reboot recovery wiring task in E-C1 owns the on-disk persistence + pattern. ``captured_at_ns`` is :func:`time.monotonic_ns` at hint + production time. + """ + + body_T_world: "SE3" + velocity_b: tuple[float, float, float] + bias: "ImuBias" + captured_at_ns: int + + +@dataclass(frozen=True) +class VioOutput: + """C1 strategy per-frame output (C1 contract v1.0.0). + + ``frame_id`` is a stable string identifier echoed from the input + ``NavCameraFrame.frame_id`` (stringified if the source is an int + or UUID); C5 uses it as a hashable key into its frame→GTSAM-key + map (Invariant — alignment). + + ``relative_pose_T`` is the strategy's current pose as a + :class:`gtsam.Pose3`; expressed in the strategy's own internal + frame (VIO has no absolute world reference — the internal frame + drifts from world over time). C5 computes the between-factor + delta via ``prev.between(curr)`` and inserts ``curr`` as the + iSAM2 Values entry. + + ``pose_covariance_6x6`` is the symmetric positive-definite 6×6 + covariance in the rotation-then-translation tangent-space order; + strategies MUST NOT tighten this during a degradation event + (Invariant — honest covariance is the AC-NEW-4 / AC-NEW-7 floor). + + ``imu_bias`` is the strategy's latest bias estimate; + ``feature_quality`` is per-frame tracker diagnostics; + ``emitted_at_ns`` is :func:`time.monotonic_ns` at output time. + """ + + frame_id: str + relative_pose_T: "SE3" + pose_covariance_6x6: "npt.NDArray[Any]" + imu_bias: "ImuBias" + feature_quality: FeatureQuality + emitted_at_ns: int + + +@dataclass(frozen=True) +class VioHealth: + """C1 strategy health snapshot (C1 contract v1.0.0). + + Returned from :meth:`VioStrategy.health_snapshot`; consumed by C13 + FDR and the composition-root watchdog. ``consecutive_lost`` ticks + every time ``state == LOST``; the strategy raises + :class:`VioFatalError` once this exceeds + ``config.vio.lost_frame_threshold`` (default 9). + """ + + state: VioState + consecutive_lost: int + bias_norm: float diff --git a/src/gps_denied_onboard/_types/vio.py b/src/gps_denied_onboard/_types/vio.py deleted file mode 100644 index e43da6a..0000000 --- a/src/gps_denied_onboard/_types/vio.py +++ /dev/null @@ -1,21 +0,0 @@ -"""C1 VIO output DTO.""" - -from __future__ import annotations - -from dataclasses import dataclass, field -from datetime import datetime -from typing import Any - - -@dataclass(frozen=True) -class VioOutput: - """VIO pose + uncertainty + health bundle. - - Concrete semantics in `_docs/02_document/components/01_c1_vio/description.md § 2`. - """ - - frame_id: int - timestamp: datetime - pose_se3: Any - covariance_6x6: Any | None = None - health_flags: dict[str, Any] = field(default_factory=dict) diff --git a/src/gps_denied_onboard/components/c1_vio/__init__.py b/src/gps_denied_onboard/components/c1_vio/__init__.py index c59d371..700f6fa 100644 --- a/src/gps_denied_onboard/components/c1_vio/__init__.py +++ b/src/gps_denied_onboard/components/c1_vio/__init__.py @@ -1,6 +1,52 @@ -"""C1 VIO component — Public API.""" +"""C1 VIO — Public API (AZ-331). -from gps_denied_onboard._types.vio import VioOutput +Per ``vio_strategy_protocol.md`` v1.0.0 the public surface consists +of: + +- :class:`VioStrategy` Protocol (4 methods). +- DTOs / enum re-exported from :mod:`gps_denied_onboard._types.nav` + (the L1 home for cross-component DTOs): :class:`VioOutput`, + :class:`VioHealth`, :class:`FeatureQuality`, :class:`WarmStartPose`, + :class:`VioState`. +- Error family rooted at :class:`VioError`; three documented subtypes. +- Config block :class:`C1VioConfig` (registered on import). + +Concrete strategies (``Okvis2Strategy``, ``VinsMonoStrategy``, +``KltRansacStrategy``) live in sibling modules and are imported +lazily by :mod:`gps_denied_onboard.runtime_root.vio_factory` — +Risk-2 mitigation: this ``__init__.py`` MUST NOT import any concrete +strategy module. +""" + +from gps_denied_onboard._types.nav import ( + FeatureQuality, + VioHealth, + VioOutput, + VioState, + WarmStartPose, +) +from gps_denied_onboard.components.c1_vio.config import C1VioConfig +from gps_denied_onboard.components.c1_vio.errors import ( + VioDegradedError, + VioError, + VioFatalError, + VioInitializingError, +) from gps_denied_onboard.components.c1_vio.interface import VioStrategy +from gps_denied_onboard.config.schema import register_component_block -__all__ = ["VioOutput", "VioStrategy"] +register_component_block("c1_vio", C1VioConfig) + +__all__ = [ + "C1VioConfig", + "FeatureQuality", + "VioDegradedError", + "VioError", + "VioFatalError", + "VioHealth", + "VioInitializingError", + "VioOutput", + "VioState", + "VioStrategy", + "WarmStartPose", +] diff --git a/src/gps_denied_onboard/components/c1_vio/config.py b/src/gps_denied_onboard/components/c1_vio/config.py new file mode 100644 index 0000000..059de37 --- /dev/null +++ b/src/gps_denied_onboard/components/c1_vio/config.py @@ -0,0 +1,63 @@ +"""C1 VIO strategy config block (AZ-331). + +Registered into ``config.components['c1_vio']`` by the package +``__init__.py``. The composition-root factory +:func:`gps_denied_onboard.runtime_root.vio_factory.build_vio_strategy` +reads this block to select the strategy and configure the LOST→FATAL +transition + warm-start convergence budget. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Final + +from gps_denied_onboard.config.schema import ConfigError + +__all__ = [ + "C1VioConfig", + "KNOWN_STRATEGIES", +] + +KNOWN_STRATEGIES: Final[frozenset[str]] = frozenset( + {"okvis2", "vins_mono", "klt_ransac"} +) + + +@dataclass(frozen=True) +class C1VioConfig: + """Per-component config for C1 VIO. + + ``strategy`` selects exactly one of the three backends + (``okvis2`` / ``vins_mono`` / ``klt_ransac``); the + composition-root factory respects compile-time ``BUILD_*`` gating + on top of this label. + + ``lost_frame_threshold`` is the number of consecutive ``LOST`` + frames before the strategy raises :class:`VioFatalError`; + default 9 per ``vio_strategy_protocol.md`` v1.0.0. + + ``warm_start_max_frames`` is the convergence budget after + :meth:`VioStrategy.reset_to_warm_start`; default 5. + """ + + strategy: str = "klt_ransac" + lost_frame_threshold: int = 9 + warm_start_max_frames: int = 5 + + def __post_init__(self) -> None: + if self.strategy not in KNOWN_STRATEGIES: + raise ConfigError( + f"C1VioConfig.strategy={self.strategy!r} not in " + f"{sorted(KNOWN_STRATEGIES)}" + ) + if self.lost_frame_threshold < 1: + raise ConfigError( + f"C1VioConfig.lost_frame_threshold must be >= 1; " + f"got {self.lost_frame_threshold}" + ) + if self.warm_start_max_frames < 1: + raise ConfigError( + f"C1VioConfig.warm_start_max_frames must be >= 1; " + f"got {self.warm_start_max_frames}" + ) diff --git a/src/gps_denied_onboard/components/c1_vio/errors.py b/src/gps_denied_onboard/components/c1_vio/errors.py new file mode 100644 index 0000000..0ae1231 --- /dev/null +++ b/src/gps_denied_onboard/components/c1_vio/errors.py @@ -0,0 +1,64 @@ +"""C1 VioStrategy error taxonomy (AZ-331). + +Every ``VioStrategy`` method raises only members of :class:`VioError`. +Lower-level exceptions from OpenCV / OKVIS2 / VINS-Mono / GTSAM MUST +be caught and rewrapped — the contract closes the error envelope so +consumers can ``except VioError`` once and handle the family. + +:class:`VioDegradedError` is documented but is **not raised** during +the normal degraded-operation path: degraded operation returns a +``VioOutput`` with inflated covariance and ``VioHealth.state == DEGRADED``. +The error type exists for the rare degraded→fatal transition. + +A separate composition-time error +(:class:`gps_denied_onboard.runtime_root.errors.StrategyNotAvailableError`) +lives outside this family — it is raised by the factory, not by a +``VioStrategy`` method. +""" + +from __future__ import annotations + +__all__ = [ + "VioDegradedError", + "VioError", + "VioFatalError", + "VioInitializingError", +] + + +class VioError(Exception): + """Base class for the C1 VIO error family. + + All three documented subtypes are children. Consumers catch the + family with ``except c1_vio.errors.VioError as e``. + """ + + +class VioInitializingError(VioError): + """Raised while ``VioHealth.state == INIT`` and the strategy has no + pose to emit. + + C5 fusion catches this and falls back to the FC IMU prior until + the strategy reports ``TRACKING``. + """ + + +class VioDegradedError(VioError): + """Raised on the rare degraded→fatal transition. + + The normal degraded-operation path is NOT this exception — it is + a ``VioOutput`` with inflated covariance + ``VioHealth.state == + DEGRADED``. This type exists only so consumer ``except VioError`` + wrappers can name the family member explicitly. + """ + + +class VioFatalError(VioError): + """Raised once ``consecutive_lost`` exceeds the configured threshold + (default 9) or on irrecoverable backend init failure during + :meth:`VioStrategy.reset_to_warm_start`. + + The AC-5.2 fallback path engages once this fires: the camera + ingest loop stops feeding the strategy and the watchdog flips + the composition root into FC-IMU-only mode. + """ diff --git a/src/gps_denied_onboard/components/c1_vio/interface.py b/src/gps_denied_onboard/components/c1_vio/interface.py index cd208ed..4824498 100644 --- a/src/gps_denied_onboard/components/c1_vio/interface.py +++ b/src/gps_denied_onboard/components/c1_vio/interface.py @@ -1,20 +1,94 @@ -"""C1 `VioStrategy` Protocol. +"""C1 ``VioStrategy`` Protocol (AZ-331). -Concrete strategies: OKVIS2 (default), VINS-Mono (research-only), KLT/RANSAC -(mandatory simple baseline). See `_docs/02_document/components/01_c1_vio/`. +PEP 544 ``typing.Protocol`` with ``runtime_checkable=True``; four +methods that span the camera-ingest hot path +(``process_frame``), F8 reboot recovery (``reset_to_warm_start``), +diagnostics (``health_snapshot``), and self-identification +(``current_strategy_label``). + +Concrete impls — :class:`Okvis2Strategy` (AZ-332), +:class:`VinsMonoStrategy` (AZ-333), :class:`KltRansacStrategy` +(AZ-334) — live in sibling modules and are imported lazily by +:mod:`gps_denied_onboard.runtime_root.vio_factory`. + +The contract at +``_docs/02_document/contracts/c1_vio/vio_strategy_protocol.md`` +v1.0.0 is the authoritative shape; this module mirrors it 1:1. """ from __future__ import annotations -from typing import Protocol +from typing import TYPE_CHECKING, Literal, Protocol, runtime_checkable -from gps_denied_onboard._types.nav import ImuWindow, NavCameraFrame -from gps_denied_onboard._types.vio import VioOutput +if TYPE_CHECKING: + from gps_denied_onboard._types.calibration import CameraCalibration + from gps_denied_onboard._types.nav import ( + ImuWindow, + NavCameraFrame, + VioHealth, + VioOutput, + WarmStartPose, + ) + +__all__ = ["VioStrategy"] +@runtime_checkable class VioStrategy(Protocol): - """Visual-Inertial-Odometry strategy.""" + """On-Jetson visual / visual-inertial odometry runtime. - def step(self, frame: NavCameraFrame, imu: ImuWindow) -> VioOutput: - """Process a single nav-camera frame + IMU window and return a VIO update.""" + Implementations: + :class:`Okvis2Strategy` (production-default, OKVIS2 SLAM), + :class:`VinsMonoStrategy` (research-only), + :class:`KltRansacStrategy` (mandatory simple-baseline per ADR-002). + Selection is owned by the composition root. + + Invariants (see ``vio_strategy_protocol.md`` v1.0.0): + - Single-threaded per instance (one camera-ingest writer thread). + - ``current_strategy_label()`` is constant per instance and + equals ``config.vio.strategy`` exactly. + - Error envelope is closed — only members of + :class:`VioError` escape ``process_frame``. + """ + + def process_frame( + self, + frame: "NavCameraFrame", + imu: "ImuWindow", + calibration: "CameraCalibration", + ) -> "VioOutput": + """Camera-ingest hot-path call (one per nav-camera frame). + + ``VioOutput.frame_id`` MUST equal ``frame.frame_id`` (C5 + alignment invariant). Raises :class:`VioInitializingError` + while booting (state == INIT; no output emitted) and + :class:`VioFatalError` once ``consecutive_lost`` exceeds the + configured threshold. During DEGRADED operation the method + returns normally with an inflated covariance — NOT raises + :class:`VioDegradedError`. + """ + ... + + def reset_to_warm_start(self, hint: "WarmStartPose") -> None: + """Destructive re-init from an F8-reboot warm-start hint. + + Clears keyframe window, IMU integration state, feature + tracks. Subsequent ``process_frame`` calls re-initialise from + the hint. Raises :class:`VioFatalError` only on irrecoverable + backend init failure. + """ + ... + + def health_snapshot(self) -> "VioHealth": + """Most-recent health state — FDR-stamped per the AC-NEW-3 audit.""" + ... + + def current_strategy_label( + self, + ) -> Literal["okvis2", "vins_mono", "klt_ransac"]: + """Identify which concrete strategy is wired here. + + Returned string equals ``config.vio.strategy`` exactly. + AC-NEW-3 FDR audit relies on this property. + """ ... diff --git a/src/gps_denied_onboard/components/c5_state/eskf_baseline.py b/src/gps_denied_onboard/components/c5_state/eskf_baseline.py index 8f013f1..f36b65d 100644 --- a/src/gps_denied_onboard/components/c5_state/eskf_baseline.py +++ b/src/gps_denied_onboard/components/c5_state/eskf_baseline.py @@ -449,9 +449,9 @@ class EskfStateEstimator(StateEstimator): residual in the previous body frame. """ self._close_cold_start_window() - ts_ns = _datetime_to_ns(vio.timestamp) + ts_ns = vio.emitted_at_ns self._guard_timestamp(ts_ns, source="vio") - curr_pose = _pose_se3_to_array(vio.pose_se3) + curr_pose = vio.relative_pose_T.matrix() if self._prev_vio_pose is None: self._prev_vio_pose = curr_pose @@ -498,7 +498,7 @@ class EskfStateEstimator(StateEstimator): H = np.zeros((6, _N_STATE), dtype=np.float64) H[0:3, _IDX_POS] = np.eye(3) H[3:6, _IDX_ROT] = prev_R # rotate body-frame perturbation back to world - R_meas = _measurement_noise(vio.covariance_6x6) + R_meas = _measurement_noise(vio.pose_covariance_6x6) try: self._kalman_update(H, residual, R_meas, source="vio") diff --git a/src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py b/src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py index 41ff6ac..684e60a 100644 --- a/src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py +++ b/src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py @@ -6,7 +6,7 @@ real bodies. AZ-383 owns the three Protocol factor-add methods: * ``add_vio(vio: VioOutput)`` — ``BetweenFactorPose3`` between consecutive pose keys with a noise model derived from - ``vio.covariance_6x6``. + ``vio.pose_covariance_6x6``. * ``add_pose_anchor(pose: PoseEstimate)`` — mode-dispatched per ``pose.covariance_mode``: ``"marginals"`` → ``PriorFactorPose3`` + ``update``; ``"jacobian"`` → skip iSAM2 add (per the AZ-361 cross-task @@ -403,7 +403,7 @@ class GtsamIsam2StateEstimator(StateEstimator): """Register a callback fired exactly once per fallback recovery.""" return self._fallback.subscribe_recovered(callback) - def key_for_frame(self, frame_id: UUID | int) -> int: + def key_for_frame(self, frame_id: UUID | int | str) -> int: """Return the GTSAM ``Key`` for ``frame_id``, assigning on first use. AZ-383 calls this from ``add_vio`` and ``add_pose_anchor`` to @@ -653,10 +653,10 @@ class GtsamIsam2StateEstimator(StateEstimator): """ handle = self._require_handle() self._close_cold_start_window() - ts_ns = _datetime_to_ns(vio.timestamp) + ts_ns = vio.emitted_at_ns self._guard_timestamp(ts_ns, source="vio") - curr_pose = _pose_se3_to_gtsam(vio.pose_se3) + curr_pose = vio.relative_pose_T curr_key = self.key_for_frame(vio.frame_id) if self._prev_vio is None: @@ -674,10 +674,10 @@ class GtsamIsam2StateEstimator(StateEstimator): ) return - prev_pose = _pose_se3_to_gtsam(self._prev_vio.pose_se3) + prev_pose = self._prev_vio.relative_pose_T prev_key = self.key_for_frame(self._prev_vio.frame_id) relative_pose = prev_pose.between(curr_pose) - noise = _build_pose_noise(vio.covariance_6x6) + noise = _build_pose_noise(vio.pose_covariance_6x6) factor = gtsam.BetweenFactorPose3(prev_key, curr_key, relative_pose, noise) try: diff --git a/src/gps_denied_onboard/components/c5_state/interface.py b/src/gps_denied_onboard/components/c5_state/interface.py index 9a00d22..ce170cf 100644 --- a/src/gps_denied_onboard/components/c5_state/interface.py +++ b/src/gps_denied_onboard/components/c5_state/interface.py @@ -26,7 +26,7 @@ if TYPE_CHECKING: EstimatorHealth, EstimatorOutput, ) - from gps_denied_onboard._types.vio import VioOutput + from gps_denied_onboard._types.nav import VioOutput __all__ = ["StateEstimator"] diff --git a/src/gps_denied_onboard/healthcheck.py b/src/gps_denied_onboard/healthcheck.py index 3f4d31e..6938412 100644 --- a/src/gps_denied_onboard/healthcheck.py +++ b/src/gps_denied_onboard/healthcheck.py @@ -23,12 +23,12 @@ def check() -> int: from gps_denied_onboard._types import ( # noqa: F401 calibration, emitted, + inference, manifests, matching, nav, pose, tile, - vio, vpr, ) from gps_denied_onboard.logging import get_logger # noqa: F401 diff --git a/src/gps_denied_onboard/runtime_root/errors.py b/src/gps_denied_onboard/runtime_root/errors.py index b495fa4..306f1c2 100644 --- a/src/gps_denied_onboard/runtime_root/errors.py +++ b/src/gps_denied_onboard/runtime_root/errors.py @@ -2,7 +2,7 @@ These are raised at composition time (``build_*`` factory entry) and NOT during the running flight. Components own their per-runtime error -families; this module owns the cross-component selection error. +families; this module owns the cross-component selection errors. """ from __future__ import annotations @@ -22,3 +22,15 @@ class RuntimeNotAvailableError(RuntimeError): The message MUST name the requested runtime label so the operator can correlate against ``.env``'s ``BUILD_*`` matrix without guessing. """ + + +class StrategyNotAvailableError(RuntimeError): + """Raised when ``build_vio_strategy`` is asked for a VIO strategy whose + compile-time ``BUILD_*`` flag is OFF (AZ-331). + + Distinct from :class:`RuntimeNotAvailableError` because the C1 + contract names this error type explicitly (AC-5). The message + MUST name both the requested strategy label and the missing + ``BUILD_*`` flag so the operator can correlate against the + binary's compile matrix. + """ diff --git a/src/gps_denied_onboard/runtime_root/vio_factory.py b/src/gps_denied_onboard/runtime_root/vio_factory.py new file mode 100644 index 0000000..6b6d0cb --- /dev/null +++ b/src/gps_denied_onboard/runtime_root/vio_factory.py @@ -0,0 +1,114 @@ +"""C1 VIO strategy composition-root factory (AZ-331). + +:func:`build_vio_strategy` selects exactly one strategy by +``config.components['c1_vio'].strategy`` and respects compile-time +``BUILD_*`` gating: requesting a strategy whose flag is OFF raises +:class:`StrategyNotAvailableError` at composition time (NOT at first +frame). + +Concrete strategy modules (``okvis2``, ``vins_mono``, ``klt_ransac``) +are imported lazily — a Tier-0 workstation build with +``BUILD_OKVIS2=OFF`` MUST NOT load ``c1_vio.okvis2`` (Risk-2 / I-5; +verifiable via ``sys.modules``). +""" + +from __future__ import annotations + +import os +from typing import TYPE_CHECKING + +from gps_denied_onboard.runtime_root.errors import StrategyNotAvailableError + +if TYPE_CHECKING: + from gps_denied_onboard.components.c1_vio import ( + C1VioConfig, + VioStrategy, + ) + from gps_denied_onboard.components.c13_fdr import FdrClient + from gps_denied_onboard.config.schema import Config + +__all__ = ["build_vio_strategy"] + + +_STRATEGY_TO_BUILD_FLAG: dict[str, str] = { + "okvis2": "BUILD_OKVIS2", + "vins_mono": "BUILD_VINS_MONO", + "klt_ransac": "BUILD_KLT_RANSAC", +} + +_STRATEGY_TO_MODULE: dict[str, tuple[str, str]] = { + "okvis2": ("gps_denied_onboard.components.c1_vio.okvis2", "Okvis2Strategy"), + "vins_mono": ( + "gps_denied_onboard.components.c1_vio.vins_mono", + "VinsMonoStrategy", + ), + "klt_ransac": ( + "gps_denied_onboard.components.c1_vio.klt_ransac", + "KltRansacStrategy", + ), +} + + +def _is_build_flag_on(flag_name: str) -> bool: + """Read a compile-time ``BUILD_*`` flag from the environment. + + ``ON`` / ``1`` / ``true`` / ``yes`` (case-insensitive) → ``True``; + anything else (including unset) → ``False``. Defaults to OFF so + test environments must opt-in explicitly per strategy. + """ + raw = os.environ.get(flag_name, "") + return raw.strip().lower() in {"on", "1", "true", "yes"} + + +def _c1_config(config: "Config") -> "C1VioConfig": + """Pull the registered C1 config block. + + ``c1_vio.__init__`` registers it on import; a missing + registration is a developer error and surfaces as ``KeyError`` + rather than a silent fallback. + """ + return config.components["c1_vio"] + + +def build_vio_strategy( + config: "Config", + *, + fdr_client: "FdrClient", +) -> "VioStrategy": + """Construct the :class:`VioStrategy` impl selected by config. + + 1. Reads ``config.components['c1_vio'].strategy``. + 2. Checks the matching ``BUILD_*`` flag — if OFF, raises + :class:`StrategyNotAvailableError` BEFORE any import. + 3. Lazily imports the concrete strategy module. + 4. Constructs and returns the strategy instance, passing + ``config`` and ``fdr_client``. + + Raises :class:`StrategyNotAvailableError` when the compile-time + flag is OFF (canonical Tier-0 path) or when the concrete strategy + module has not been built yet (AZ-332 / AZ-333 / AZ-334 pending). + """ + block = _c1_config(config) + strategy = block.strategy + flag_name = _STRATEGY_TO_BUILD_FLAG.get(strategy) + module_info = _STRATEGY_TO_MODULE.get(strategy) + if flag_name is None or module_info is None: + raise StrategyNotAvailableError( + f"VioStrategy {strategy!r} is not buildable in this binary." + ) + if not _is_build_flag_on(flag_name): + raise StrategyNotAvailableError( + f"VioStrategy {strategy!r} requires {flag_name}=ON in this " + "binary; the flag is OFF." + ) + module_name, class_name = module_info + try: + module = __import__(module_name, fromlist=[class_name]) + except ModuleNotFoundError as exc: + raise StrategyNotAvailableError( + f"VioStrategy {strategy!r} is configured but its concrete impl " + f"module {module_name!r} has not been built into this binary " + "yet (AZ-332 / AZ-333 / AZ-334 pending)." + ) from exc + strategy_cls = getattr(module, class_name) + return strategy_cls(config, fdr_client=fdr_client) diff --git a/tests/unit/c1_vio/test_protocol_conformance.py b/tests/unit/c1_vio/test_protocol_conformance.py new file mode 100644 index 0000000..87effce --- /dev/null +++ b/tests/unit/c1_vio/test_protocol_conformance.py @@ -0,0 +1,448 @@ +"""AZ-331 — C1 VioStrategy Protocol + DTO + error + factory conformance. + +Covers all 9 ACs of AZ-331 plus NFR-perf-factory and +NFR-reliability-error-family. The factory ACs (AC-4 / AC-5) substitute +fake strategy modules at ``sys.modules`` boundaries so the test never +touches OKVIS2 / VINS-Mono / OpenCV native libraries. +""" + +from __future__ import annotations + +import dataclasses +import re +import sys +import time +import types +from pathlib import Path + +import gtsam +import numpy as np +import pytest + +from gps_denied_onboard._types.nav import ( + FeatureQuality, + ImuBias, + VioHealth, + VioOutput, + VioState, + WarmStartPose, +) +from gps_denied_onboard.components.c1_vio import ( + C1VioConfig, + VioDegradedError, + VioError, + VioFatalError, + VioInitializingError, + VioStrategy, +) +from gps_denied_onboard.components.c1_vio.config import KNOWN_STRATEGIES +from gps_denied_onboard.config.schema import Config, ConfigError +from gps_denied_onboard.runtime_root.errors import StrategyNotAvailableError +from gps_denied_onboard.runtime_root.vio_factory import build_vio_strategy + + +_CONTRACT_PATH = ( + Path(__file__).resolve().parents[3] + / "_docs/02_document/contracts/c1_vio/vio_strategy_protocol.md" +) +_STRATEGY_MODULES: dict[str, tuple[str, str, str]] = { + "okvis2": ( + "gps_denied_onboard.components.c1_vio.okvis2", + "Okvis2Strategy", + "BUILD_OKVIS2", + ), + "vins_mono": ( + "gps_denied_onboard.components.c1_vio.vins_mono", + "VinsMonoStrategy", + "BUILD_VINS_MONO", + ), + "klt_ransac": ( + "gps_denied_onboard.components.c1_vio.klt_ransac", + "KltRansacStrategy", + "BUILD_KLT_RANSAC", + ), +} + + +# ---------------------------------------------------------------------- +# Fakes that structurally satisfy the VioStrategy Protocol. + + +class _FullVioStrategy: + def __init__(self, config: Config, *, fdr_client) -> None: + self.config = config + self.fdr_client = fdr_client + self._label = config.components["c1_vio"].strategy + + def process_frame(self, frame, imu, calibration): + raise NotImplementedError + + def reset_to_warm_start(self, hint): + return None + + def health_snapshot(self): + return VioHealth(state=VioState.INIT, consecutive_lost=0, bias_norm=0.0) + + def current_strategy_label(self): + return self._label + + +class _PartialVioStrategy: + def process_frame(self, frame, imu, calibration): + raise NotImplementedError + + def reset_to_warm_start(self, hint): + return None + + +def _config_with_strategy(strategy: str) -> Config: + return Config.with_blocks(c1_vio=C1VioConfig(strategy=strategy)) + + +def _install_fake_strategy(strategy_label: str) -> type: + module_name, class_name, _flag = _STRATEGY_MODULES[strategy_label] + + class _FakeStrategy(_FullVioStrategy): + pass + + _FakeStrategy.__name__ = class_name + module = types.ModuleType(module_name) + setattr(module, class_name, _FakeStrategy) + sys.modules[module_name] = module + return _FakeStrategy + + +@pytest.fixture +def strategy_module_cleanup(): + """Pop every fake strategy module before/after each factory test.""" + for module_name, _, _ in _STRATEGY_MODULES.values(): + sys.modules.pop(module_name, None) + yield + for module_name, _, _ in _STRATEGY_MODULES.values(): + sys.modules.pop(module_name, None) + + +def _zero_bias() -> ImuBias: + return ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)) + + +def _neutral_feature_quality() -> FeatureQuality: + return FeatureQuality(tracked=20, new=2, lost=1, mean_parallax=5.0, mre_px=1.0) + + +def _make_vio_output(frame_id: str = "frame-0001") -> VioOutput: + return VioOutput( + frame_id=frame_id, + relative_pose_T=gtsam.Pose3(np.eye(4)), + pose_covariance_6x6=np.eye(6) * 0.01, + imu_bias=_zero_bias(), + feature_quality=_neutral_feature_quality(), + emitted_at_ns=1_000_000_000, + ) + + +# ---------------------------------------------------------------------- +# AC-1: Protocol is conformance-checkable. + + +def test_ac1_vio_strategy_conformance_full() -> None: + instance = _FullVioStrategy(_config_with_strategy("klt_ransac"), fdr_client=None) + assert isinstance(instance, VioStrategy) + + +def test_ac1_vio_strategy_conformance_partial_missing_methods() -> None: + assert not isinstance(_PartialVioStrategy(), VioStrategy) + + +# ---------------------------------------------------------------------- +# AC-2: frozen DTOs reject mutation. + + +@pytest.mark.parametrize( + "dto, field_name, new_value", + [ + (_make_vio_output(), "frame_id", "renamed"), + ( + VioHealth(state=VioState.TRACKING, consecutive_lost=0, bias_norm=0.0), + "state", + VioState.LOST, + ), + ( + WarmStartPose( + body_T_world=gtsam.Pose3(np.eye(4)), + velocity_b=(0.0, 0.0, 0.0), + bias=_zero_bias(), + captured_at_ns=1_000_000_000, + ), + "captured_at_ns", + 0, + ), + (_neutral_feature_quality(), "tracked", 0), + ], +) +def test_ac2_frozen_dtos_reject_mutation(dto, field_name: str, new_value) -> None: + original_value = getattr(dto, field_name) + with pytest.raises(dataclasses.FrozenInstanceError): + setattr(dto, field_name, new_value) + assert getattr(dto, field_name) == original_value + + +# ---------------------------------------------------------------------- +# AC-3: error hierarchy catchable as a single family. + + +@pytest.mark.parametrize( + "exc_factory", + [VioInitializingError, VioDegradedError, VioFatalError], +) +def test_ac3_all_vio_errors_caught_as_family(exc_factory) -> None: + with pytest.raises(VioError): + raise exc_factory("boom") + + +def test_ac3_unrelated_exception_not_caught_as_family() -> None: + with pytest.raises(ValueError): + try: + raise ValueError("not us") + except VioError: + pytest.fail("ValueError must not be caught as VioError") + + +def test_ac3_strategy_not_available_outside_family() -> None: + with pytest.raises(StrategyNotAvailableError): + try: + raise StrategyNotAvailableError("composition-time") + except VioError: + pytest.fail( + "StrategyNotAvailableError is a composition-root error " + "and MUST NOT be in the c1 VioError family" + ) + + +# ---------------------------------------------------------------------- +# AC-4 + AC-5: factory honours config + BUILD flag gate. + + +@pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES)) +def test_ac4_build_vio_strategy_returns_protocol_impl( + monkeypatch, strategy_module_cleanup, strategy +) -> None: + _, _, flag = _STRATEGY_MODULES[strategy] + monkeypatch.setenv(flag, "ON") + fake_cls = _install_fake_strategy(strategy) + config = _config_with_strategy(strategy) + instance = build_vio_strategy(config, fdr_client=object()) + assert isinstance(instance, fake_cls) + assert isinstance(instance, VioStrategy) + + +@pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES)) +def test_ac5_build_vio_strategy_flag_off_no_import( + monkeypatch, strategy_module_cleanup, strategy +) -> None: + module_name, _, flag = _STRATEGY_MODULES[strategy] + monkeypatch.delenv(flag, raising=False) + config = _config_with_strategy(strategy) + with pytest.raises(StrategyNotAvailableError) as exc_info: + build_vio_strategy(config, fdr_client=object()) + assert strategy in str(exc_info.value) + assert flag in str(exc_info.value) + assert module_name not in sys.modules + + +@pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES)) +def test_ac5_build_vio_strategy_flag_on_but_module_missing( + monkeypatch, strategy_module_cleanup, strategy +) -> None: + _, _, flag = _STRATEGY_MODULES[strategy] + monkeypatch.setenv(flag, "ON") + config = _config_with_strategy(strategy) + with pytest.raises(StrategyNotAvailableError) as exc_info: + build_vio_strategy(config, fdr_client=object()) + assert strategy in str(exc_info.value) + + +# ---------------------------------------------------------------------- +# AC-6: unknown strategy label rejected at config load. + + +@pytest.mark.parametrize( + "bad_label", + ["openvslam", "orbslam3", "OKVIS2", "okvis", ""], +) +def test_ac6_unknown_strategy_rejected_at_config_load(bad_label: str) -> None: + with pytest.raises(ConfigError) as exc_info: + C1VioConfig(strategy=bad_label) + msg = str(exc_info.value) + for valid in KNOWN_STRATEGIES: + assert valid in msg + + +# ---------------------------------------------------------------------- +# AC-7: current_strategy_label() matches config exactly. + + +@pytest.mark.parametrize("strategy", sorted(_STRATEGY_MODULES)) +def test_ac7_current_strategy_label_matches_config( + monkeypatch, strategy_module_cleanup, strategy +) -> None: + _, _, flag = _STRATEGY_MODULES[strategy] + monkeypatch.setenv(flag, "ON") + _install_fake_strategy(strategy) + config = _config_with_strategy(strategy) + instance = build_vio_strategy(config, fdr_client=object()) + assert instance.current_strategy_label() == strategy + assert ( + instance.current_strategy_label() == config.components["c1_vio"].strategy + ) + + +# ---------------------------------------------------------------------- +# AC-8: contract file matches Protocol shape. + + +_METHOD_TABLE_RE = re.compile(r"^\|\s*`(?P[a-z_][a-z0-9_]*)`\s*\|", re.MULTILINE) + + +def _methods_from_contract() -> set[str]: + text = _CONTRACT_PATH.read_text(encoding="utf-8") + surface_start = text.index("### Protocol surface") + next_section = text.find("\n### ", surface_start + len("### Protocol surface")) + section = text[surface_start:next_section] if next_section != -1 else text[surface_start:] + return {m.group("name") for m in _METHOD_TABLE_RE.finditer(section)} + + +def _protocol_methods(proto: type) -> set[str]: + return { + name + for name in dir(proto) + if not name.startswith("_") and callable(getattr(proto, name)) + } + + +def test_ac8_contract_methods_match_protocol() -> None: + contract_methods = _methods_from_contract() + protocol_methods = _protocol_methods(VioStrategy) + missing_in_protocol = contract_methods - protocol_methods + missing_in_contract = protocol_methods - contract_methods + assert not missing_in_protocol, ( + "Methods declared in vio_strategy_protocol.md Shape section but " + f"missing from the Protocol: {sorted(missing_in_protocol)}" + ) + assert not missing_in_contract, ( + "Methods present on the Protocol but missing from the contract " + f"Shape section: {sorted(missing_in_contract)}" + ) + + +def test_ac8_contract_lists_all_three_error_subtypes() -> None: + text = _CONTRACT_PATH.read_text(encoding="utf-8") + for name in {"VioInitializingError", "VioDegradedError", "VioFatalError"}: + assert name in text, ( + f"Contract file is missing the documented error subtype {name!r}" + ) + + +# ---------------------------------------------------------------------- +# AC-9: VioOutput.frame_id echo invariant is typed. + + +def test_ac9_vio_output_frame_id_is_typed_str() -> None: + """``VioOutput.frame_id`` annotation is ``str`` per AZ-331 AC-9. + + With ``from __future__ import annotations`` PEP-563 stringifies + every annotation at module load, so ``__annotations__`` returns + the literal ``'str'``. Compare against the string to avoid the + full ``get_type_hints`` forward-ref resolution path (which would + try to resolve neighbouring TYPE_CHECKING-only names like + :class:`SE3`). + """ + annotation = VioOutput.__annotations__["frame_id"] + assert annotation == "str", ( + f"frame_id annotation should be 'str'; got {annotation!r}" + ) + + +def test_ac9_vio_output_docstring_documents_echo_invariant() -> None: + docstring = VioOutput.__doc__ or "" + assert "echo" in docstring.lower(), ( + "VioOutput docstring must document the frame_id echo invariant " + "(MUST equal NavCameraFrame.frame_id from the input frame)" + ) + assert "frame_id" in docstring.lower() + + +# ---------------------------------------------------------------------- +# NFRs. + + +@pytest.mark.parametrize( + "exc_type", + [VioInitializingError, VioDegradedError, VioFatalError], +) +def test_nfr_reliability_all_vio_errors_subclass_family(exc_type) -> None: + assert issubclass(exc_type, VioError) + + +def test_nfr_reliability_strategy_not_available_not_in_family() -> None: + assert not issubclass(StrategyNotAvailableError, VioError) + + +def test_nfr_perf_factory_under_200ms_p99( + monkeypatch, strategy_module_cleanup +) -> None: + """Factory p99 ≤ 200 ms across 1000 calls (NFR-perf-factory).""" + strategy = "klt_ransac" + _, _, flag = _STRATEGY_MODULES[strategy] + monkeypatch.setenv(flag, "ON") + _install_fake_strategy(strategy) + config = _config_with_strategy(strategy) + + durations_ms: list[float] = [] + for _ in range(1000): + t0 = time.perf_counter() + build_vio_strategy(config, fdr_client=object()) + durations_ms.append((time.perf_counter() - t0) * 1000.0) + + durations_ms.sort() + p99 = durations_ms[int(0.99 * len(durations_ms))] + assert p99 <= 200.0, ( + f"build_vio_strategy() p99={p99:.3f} ms exceeds 200 ms NFR" + ) + + +# ---------------------------------------------------------------------- +# Surface coverage. + + +def test_vio_state_enum_surface() -> None: + assert {v.value for v in VioState} == {"init", "tracking", "degraded", "lost"} + + +def test_c1_config_lost_frame_threshold_validation() -> None: + with pytest.raises(ConfigError): + C1VioConfig(lost_frame_threshold=0) + with pytest.raises(ConfigError): + C1VioConfig(lost_frame_threshold=-1) + + +def test_c1_config_warm_start_max_frames_validation() -> None: + with pytest.raises(ConfigError): + C1VioConfig(warm_start_max_frames=0) + + +def test_feature_quality_dto_constructs_and_freezes() -> None: + fq = _neutral_feature_quality() + with pytest.raises(dataclasses.FrozenInstanceError): + fq.mre_px = 99.0 # type: ignore[misc] + + +def test_warm_start_pose_constructs_with_zero_bias() -> None: + hint = WarmStartPose( + body_T_world=gtsam.Pose3(np.eye(4)), + velocity_b=(0.0, 0.0, 0.0), + bias=_zero_bias(), + captured_at_ns=1_000_000_000, + ) + assert hint.captured_at_ns == 1_000_000_000 + assert hint.bias.accel_bias == (0.0, 0.0, 0.0) diff --git a/tests/unit/c1_vio/test_smoke.py b/tests/unit/c1_vio/test_smoke.py deleted file mode 100644 index d7b0d71..0000000 --- a/tests/unit/c1_vio/test_smoke.py +++ /dev/null @@ -1,9 +0,0 @@ -"""C1 VIO smoke test — AZ-263 AC-9: verify the component interface is importable.""" - - -def test_interface_importable() -> None: - # Assert - from gps_denied_onboard.components.c1_vio import VioOutput, VioStrategy - - assert VioStrategy is not None - assert VioOutput is not None diff --git a/tests/unit/c5_state/test_az381_state_protocol.py b/tests/unit/c5_state/test_az381_state_protocol.py index bbeb2ea..558b181 100644 --- a/tests/unit/c5_state/test_az381_state_protocol.py +++ b/tests/unit/c5_state/test_az381_state_protocol.py @@ -24,7 +24,7 @@ from gps_denied_onboard._types.state import ( PoseSourceLabel, Quat, ) -from gps_denied_onboard._types.vio import VioOutput +from gps_denied_onboard._types.nav import VioOutput from gps_denied_onboard.components.c5_state import ( C5StateConfig, EstimatorDegradedError, diff --git a/tests/unit/c5_state/test_az383_factor_adds.py b/tests/unit/c5_state/test_az383_factor_adds.py index 356cc7f..68cb9d5 100644 --- a/tests/unit/c5_state/test_az383_factor_adds.py +++ b/tests/unit/c5_state/test_az383_factor_adds.py @@ -32,14 +32,19 @@ import numpy as np import pytest from gps_denied_onboard._types.geo import LatLonAlt -from gps_denied_onboard._types.nav import ImuSample, ImuWindow +from gps_denied_onboard._types.nav import ( + FeatureQuality, + ImuBias, + ImuSample, + ImuWindow, + VioOutput, +) from gps_denied_onboard._types.pose import ( CovarianceMode, PoseEstimate, PoseSourceLabel, Quat, ) -from gps_denied_onboard._types.vio import VioOutput from gps_denied_onboard.components.c5_state.config import C5StateConfig from gps_denied_onboard.components.c5_state.errors import EstimatorDegradedError from gps_denied_onboard.components.c5_state.gtsam_isam2_estimator import ( @@ -73,12 +78,21 @@ def _build_estimator(*, with_stub_handle: bool = True) -> GtsamIsam2StateEstimat return estimator +_ZERO_BIAS = ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)) +_NEUTRAL_FQ = FeatureQuality( + tracked=20, new=2, lost=1, mean_parallax=5.0, mre_px=1.0, +) + + def _make_vio(*, frame_id: int, t_seconds: float, pose: np.ndarray | None = None) -> VioOutput: + matrix = pose if pose is not None else np.eye(4) return VioOutput( - frame_id=frame_id, - timestamp=datetime.fromtimestamp(t_seconds, tz=timezone.utc), - pose_se3=pose if pose is not None else np.eye(4), - covariance_6x6=np.eye(6) * 0.01, + frame_id=str(frame_id), + relative_pose_T=gtsam.Pose3(matrix), + pose_covariance_6x6=np.eye(6) * 0.01, + imu_bias=_ZERO_BIAS, + feature_quality=_NEUTRAL_FQ, + emitted_at_ns=int(t_seconds * 1_000_000_000), ) diff --git a/tests/unit/c5_state/test_az386_eskf_baseline.py b/tests/unit/c5_state/test_az386_eskf_baseline.py index 1e55fc1..23dac65 100644 --- a/tests/unit/c5_state/test_az386_eskf_baseline.py +++ b/tests/unit/c5_state/test_az386_eskf_baseline.py @@ -36,10 +36,15 @@ import pytest from gps_denied_onboard._types.fc import GpsHealth, GpsStatus from gps_denied_onboard._types.geo import LatLonAlt -from gps_denied_onboard._types.nav import ImuSample, ImuWindow +from gps_denied_onboard._types.nav import ( + FeatureQuality, + ImuBias, + ImuSample, + ImuWindow, + VioOutput, +) from gps_denied_onboard._types.pose import CovarianceMode, PoseEstimate, Quat from gps_denied_onboard._types.state import IsamState, PoseSourceLabel -from gps_denied_onboard._types.vio import VioOutput from gps_denied_onboard.components.c5_state import ( C5StateConfig, EstimatorFatalError, @@ -104,18 +109,26 @@ def _pose_translated(x: float, y: float, z: float) -> np.ndarray: return p +_ESKF_ZERO_BIAS = ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)) +_ESKF_NEUTRAL_FQ = FeatureQuality( + tracked=20, new=2, lost=1, mean_parallax=5.0, mre_px=1.0, +) + + def _vio( frame_id: int, ts_ns: int, pose: np.ndarray, cov: np.ndarray | None = None, ) -> VioOutput: - # Default sigma ≈ 0.1 m / 0.1 rad — realistic VIO uncertainty. + import gtsam return VioOutput( - frame_id=frame_id, - timestamp=datetime.fromtimestamp(ts_ns / 1_000_000_000, tz=timezone.utc), - pose_se3=pose, - covariance_6x6=cov if cov is not None else np.eye(6) * 0.01, + frame_id=str(frame_id), + relative_pose_T=gtsam.Pose3(pose), + pose_covariance_6x6=cov if cov is not None else np.eye(6) * 0.01, + imu_bias=_ESKF_ZERO_BIAS, + feature_quality=_ESKF_NEUTRAL_FQ, + emitted_at_ns=int(ts_ns), ) diff --git a/tests/unit/c5_state/test_az490_set_takeoff_origin.py b/tests/unit/c5_state/test_az490_set_takeoff_origin.py index 8f3bc0f..9cad9b0 100644 --- a/tests/unit/c5_state/test_az490_set_takeoff_origin.py +++ b/tests/unit/c5_state/test_az490_set_takeoff_origin.py @@ -30,8 +30,12 @@ from gps_denied_onboard._types.pose import ( PoseEstimate, Quat, ) +from gps_denied_onboard._types.nav import ( + FeatureQuality, + ImuBias, + VioOutput, +) from gps_denied_onboard._types.state import PoseSourceLabel -from gps_denied_onboard._types.vio import VioOutput from gps_denied_onboard.components.c5_state import ( C5StateConfig, EstimatorAlreadyStartedError, @@ -95,12 +99,21 @@ def _origin() -> LatLonAlt: return LatLonAlt(lat_deg=50.0, lon_deg=36.2, alt_m=200.0) +_AZ490_ZERO_BIAS = ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)) +_AZ490_NEUTRAL_FQ = FeatureQuality( + tracked=20, new=2, lost=1, mean_parallax=5.0, mre_px=1.0, +) + + def _vio(*, frame_id: int, t_seconds: float) -> VioOutput: + import gtsam return VioOutput( - frame_id=frame_id, - timestamp=datetime.fromtimestamp(t_seconds, tz=timezone.utc), - pose_se3=np.eye(4), - covariance_6x6=np.eye(6) * 0.01, + frame_id=str(frame_id), + relative_pose_T=gtsam.Pose3(np.eye(4)), + pose_covariance_6x6=np.eye(6) * 0.01, + imu_bias=_AZ490_ZERO_BIAS, + feature_quality=_AZ490_NEUTRAL_FQ, + emitted_at_ns=int(t_seconds * 1_000_000_000), ) diff --git a/tests/unit/test_types_importable.py b/tests/unit/test_types_importable.py index 605c728..321f76f 100644 --- a/tests/unit/test_types_importable.py +++ b/tests/unit/test_types_importable.py @@ -9,14 +9,24 @@ def test_types_modules_importable() -> None: from gps_denied_onboard._types import ( calibration, emitted, + inference, manifests, matching, nav, pose, tile, - vio, vpr, ) - for mod in (nav, vio, vpr, matching, pose, tile, calibration, emitted, manifests): + for mod in ( + nav, + vpr, + matching, + pose, + tile, + calibration, + emitted, + manifests, + inference, + ): assert mod is not None