diff --git a/_docs/02_tasks/todo/AZ-528_hygiene_c1_vio_facade_spine_consolidation.md b/_docs/02_tasks/done/AZ-528_hygiene_c1_vio_facade_spine_consolidation.md similarity index 100% rename from _docs/02_tasks/todo/AZ-528_hygiene_c1_vio_facade_spine_consolidation.md rename to _docs/02_tasks/done/AZ-528_hygiene_c1_vio_facade_spine_consolidation.md diff --git a/_docs/03_implementation/batch_55_cycle1_report.md b/_docs/03_implementation/batch_55_cycle1_report.md new file mode 100644 index 0000000..c5300a8 --- /dev/null +++ b/_docs/03_implementation/batch_55_cycle1_report.md @@ -0,0 +1,97 @@ +# Batch 55 — Cycle 1 Report + +**Date**: 2026-05-14 +**Tasks**: AZ-528 (Hygiene — c1_vio strategy facade orchestration-spine consolidation) +**Verdict**: COMPLETE — PASS + +## Summary + +Consolidated the 3-way orchestration-spine duplication across the +c1_vio `VioStrategy` modules (`okvis2.py`, `vins_mono.py`, +`klt_ransac.py`) into a single c1-internal helper module +`_facade_spine.py`. Closes cumulative review batches 52-54 Finding +F1 (Medium / Maintainability). Mirrors the AZ-527 precedent for the +c2_vpr-side `_assert_engine_output_dim` consolidation. No behaviour +change — every existing AZ-332 / AZ-333 / AZ-334 AC test passes +unmodified. + +## Files added / modified + +### Added (2) + +- `src/gps_denied_onboard/components/c1_vio/_facade_spine.py` — new + c1-internal helper, 225 lines. Exports 5 free functions (`now_iso`, + `bias_norm`, `se3_from_4x4`, `frame_ts_ns`, `frame_image`) and the + `FacadeSpine` mixin class (`_classify_state`, `_tick_lost`, + `_emit_transition`). +- `tests/unit/c1_vio/test_az528_facade_spine.py` — new test file, 19 + tests covering AC-1..AC-8 + a Risk-1 mitigation test (each + strategy's `__init__` statically asserts that every spine-required + attribute is assigned). All pass. + +### Modified (3) + +- `src/gps_denied_onboard/components/c1_vio/okvis2.py` — inherit from + `FacadeSpine`; import the 5 free functions; delete the 5 local + module-level definitions, 3 instance methods, and the unused + `_BIAS_NORM_FLOOR` constant. Set the 3 spine-required attributes + (`_feature_threshold`, `_producer_id`, `_strategy_label`) in + `__init__`. +- `src/gps_denied_onboard/components/c1_vio/vins_mono.py` — same + pattern as `okvis2.py`. +- `src/gps_denied_onboard/components/c1_vio/klt_ransac.py` — same + pattern; spine threshold reads `self._cfg.min_features_for_pose` + (the KLT-side equivalent of OKVIS2's `degraded_feature_threshold`). + Deferred-consolidation comments referencing this PBI removed. + +## Tests + +- `tests/unit/c1_vio/test_az528_facade_spine.py` — 19 new tests, all + pass. +- `tests/unit/c1_vio/` (full focused suite) — 120 collected, 114 + pass + 6 tier-2 skipped (unchanged from pre-AZ-528 state). +- Adjacent regression suite (`test_az270_compose_root`, + `test_az272_fdr_record_schema`, `test_az273_fdr_client_ringbuf`, + `test_ac1_scaffold_layout`, `tests/unit/c1_vio/`) — 237 pass + 6 + skipped. + +## AC traceability + +| AC | Status | Notes | +|-------|--------|---------------------------------------------------------| +| AC-1 | ✓ | Helper module exposes documented surface. | +| AC-2 | ✓ | `now_iso()` returns aware UTC `+00:00` ISO-8601. | +| AC-3 | ✓ | `bias_norm` matches L2 formula (accel + gyro). | +| AC-4 | ✓ | `se3_from_4x4` builds `gtsam.Pose3` identity correctly. | +| AC-5 | ✓ | `_classify_state` INIT during warmup, T/D thresholds. | +| AC-6 | ✓ | `_tick_lost` demotes T→D first call, escalates to LOST. | +| AC-7 | ✓ | `_emit_transition` exactly one FDR record per change. | +| AC-8 | ✓ | AST guard: 0 module-level free funcs in 3 strategies. | +| AC-9 | ✓ | All AZ-332 / AZ-333 / AZ-334 AC tests pass unmodified. | +| AC-10 | ✓ | AZ-270 layer-lint regression still passes. | + +## Code review + +See `_docs/03_implementation/reviews/batch_55_review.md` — verdict +PASS, no findings. + +## Outcomes & lessons + +- Pattern of an "AST regression guard against duplicated free + functions" continues to work well (AZ-508 / AZ-526 / AZ-527 / + AZ-528 all use it). It catches re-introduction by future authors + who don't know the consolidated home exists. +- Mixin-via-instance-attributes (rather than constructor parameters) + keeps strategy `__init__` signatures stable — the AZ-331 factory + signature `(config, *, fdr_client)` is preserved across all three + strategies post-refactor. +- A 3-point hygiene PBI was the right granularity: 1 new module, + 3 small edits to existing strategies, 1 test file with AST + import + regression guards. Same shape and complexity as AZ-527. + +## Outstanding + +None for AZ-528. F2 from cumulative review batches 52-54 (test fake ++ `_patch_pose_recovery` helper consolidation) remains deferred — +sits at a different abstraction layer and will ride a later hygiene +pass. diff --git a/_docs/03_implementation/reviews/batch_55_review.md b/_docs/03_implementation/reviews/batch_55_review.md new file mode 100644 index 0000000..6c8690a --- /dev/null +++ b/_docs/03_implementation/reviews/batch_55_review.md @@ -0,0 +1,145 @@ +# Code Review Report + +**Batch**: 55 (AZ-528 — c1_vio strategy facade orchestration-spine consolidation) +**Date**: 2026-05-14 +**Verdict**: PASS + +## Scope + +Single-task hygiene batch closing Finding F1 from +`_docs/03_implementation/cumulative_review_batches_52-54_cycle1_report.md`. +Replaces the 3-way byte-equivalent orchestration-spine duplication +across `okvis2.py`, `vins_mono.py`, `klt_ransac.py` with a single +c1-internal helper module `_facade_spine.py`. No behaviour change; +existing AZ-332 / AZ-333 / AZ-334 AC tests pass unmodified. + +### Changed files + +- `src/gps_denied_onboard/components/c1_vio/_facade_spine.py` — NEW + c1-internal helper. Exports 5 stateless free functions (`now_iso`, + `bias_norm`, `se3_from_4x4`, `frame_ts_ns`, `frame_image`) and one + mixin class (`FacadeSpine`) providing `_classify_state`, + `_tick_lost`, `_emit_transition`. 225 lines including docstring + + attribute declarations. +- `src/gps_denied_onboard/components/c1_vio/okvis2.py` — inherit from + `FacadeSpine`; import the 5 free functions; delete the 5 local + module-level definitions + 3 instance methods + unused + `_BIAS_NORM_FLOOR` constant; set 3 new spine-required attributes in + `__init__` (`_feature_threshold`, `_producer_id`, `_strategy_label`). +- `src/gps_denied_onboard/components/c1_vio/vins_mono.py` — same + pattern as `okvis2.py`. +- `src/gps_denied_onboard/components/c1_vio/klt_ransac.py` — same + pattern; reuses `_grayscale` (KLT/RANSAC-specific, not consolidated). + Spine threshold attribute reads `_cfg.min_features_for_pose` (the + KLT-side equivalent of OKVIS2's `degraded_feature_threshold`). +- `tests/unit/c1_vio/test_az528_facade_spine.py` — NEW. 19 tests + covering AC-1..AC-8 + a Risk-1 mitigation test that statically + verifies every concrete strategy's `__init__` sets every + spine-required attribute. + +## Phase 2 — Spec Compliance + +| AC | Test | Verified | +|-------|-----------------------------------------------------------------------------------------------|----------| +| AC-1 | `test_ac1_helper_module_exposes_documented_surface` | ✓ | +| AC-2 | `test_ac2_now_iso_returns_aware_utc_with_plus_offset` | ✓ | +| AC-3 | `test_ac3_bias_norm_matches_l2_formula` + `test_ac3_bias_norm_includes_gyro_component` | ✓ | +| AC-4 | `test_ac4_se3_from_4x4_builds_identity_pose` | ✓ | +| AC-5 | `test_ac5_classify_state_returns_init_during_warmup` + tracking + degraded | ✓ | +| AC-6 | `test_ac6_tick_lost_demotes_tracking_to_degraded_first_call` + escalates-to-lost | ✓ | +| AC-7 | `test_ac7_emit_transition_no_record_on_steady_state` + one-record-per-change + idempotent | ✓ | +| AC-8 | `test_ac8_no_duplicated_free_functions_remain_in_strategy_module` (parametrised over 3 files) | ✓ | +| AC-9 | Existing AZ-332 / AZ-333 / AZ-334 AC tests — all unmodified, all pass | ✓ | +| AC-10 | `tests/unit/test_az270_compose_root.py::test_ac6_only_compose_root_imports_concrete_strategies` | ✓ | + +Test counts: + +- `tests/unit/c1_vio/test_az528_facade_spine.py` — 19 tests, all pass. +- `tests/unit/c1_vio/` total — 120 collected, 114 pass + 6 tier-2 + skipped (unchanged from pre-AZ-528 state). +- Adjacent regression suite (`test_az270_compose_root`, `test_az272_fdr_record_schema`, + `test_az273_fdr_client_ringbuf`, `test_ac1_scaffold_layout`, + `tests/unit/c1_vio/`) — 237 pass + 6 skipped. + +Scope discipline: only the 5 files in the task spec's "Included" +list were touched; no test files outside the new +`test_az528_facade_spine.py` were modified (AC-9 hard requirement). + +## Phase 3 — Code Quality + +- **SRP**: `FacadeSpine` owns one responsibility — the c1_vio + strategy state machine + FDR `vio.health` record emission. Free + functions are stateless pure utilities. Each concrete strategy + retains its geometry pipeline + native-binding driver. Clean split. +- **Naming**: instance attributes the mixin reads are documented in + the class docstring; type annotations on the class declare them for + IDE / type-checker consumption (Risk 1 mitigation). +- **Comments**: deferred-consolidation comments referencing this PBI + (`# post-AZ-334 hygiene PBI ...`) were removed from + `klt_ransac.py` as required by the task spec. +- **Dead code**: removed `_BIAS_NORM_FLOOR` from `okvis2.py` (verified + via grep; not used anywhere in the c1_vio component or + cross-component). +- **DRY**: the consolidation eliminates ~120 lines of duplicated code + across the three strategy modules. +- **Test quality**: every test follows Arrange / Act / Assert; AC + tests use parametrisation where appropriate (AC-8 over the 3 + strategy modules). + +## Phase 4 — Security Quick-Scan + +N/A — pure refactor, no new inputs, no I/O, no string interpolation +into queries, no subprocess invocation. The free functions take only +typed DTOs (`ImuBias`, `NavCameraFrame`, `np.ndarray`); the mixin +methods only mutate instance state. + +## Phase 5 — Performance + +No hot-path regression. The mixin call sites are byte-equivalent to +the previous in-line definitions (same conditionals, same +`FdrClient.enqueue` invocation, same string interpolation in +`bias_norm`). The single new indirection is method dispatch through +the mixin's MRO — Python class-method resolution is O(1) on a fixed +MRO and is amortised inside the hot-path's existing call overhead. + +## Phase 6 — Cross-Task Consistency + +Single-task batch — N/A. + +## Phase 7 — Architecture Compliance + +- **Layer direction**: `_facade_spine.py` imports only from L1 + substrate (`_types.nav`, `components.c1_vio.errors`, + `fdr_client.records`) + stdlib (`datetime`, `math`) + numpy. No + imports from a higher layer; no imports from a sibling component. +- **Public API respect**: `_facade_spine` is underscore-prefixed and + NOT in `c1_vio/__init__.py`'s `__all__`. AZ-270's + `test_ac6_only_compose_root_imports_concrete_strategies` + regression-gate passes. Mirrors the AZ-527 precedent for + `_assert_engine_output_dim` in `c2_vpr`. +- **Cycles**: no new cyclic dependencies. `_facade_spine` is a leaf + inside `c1_vio`; the 3 strategy modules already imported from + `c1_vio.errors`, so the new sibling import slots in cleanly. +- **Duplicate symbols**: AC-8 AST regression guard asserts the 5 + consolidated free-function names appear only in `_facade_spine.py`. +- **Cross-cutting concerns**: the spine is a c1-internal concern by + task-spec decision (component state machines + per-strategy FDR + record producers do not generalise across c2..c5; AZ-507 carved + that boundary intentionally). NOT hoisted to `shared/helpers/`. + +## Findings + +None. + +## Conclusion + +PASS. The consolidation removes ~120 lines of duplication, preserves +behaviour across all three strategies (verified by 237 passing tests +in the focused + adjacent suite), and respects the c1_vio component +boundary. The cumulative review batches 52-54 Finding F1 is closed. + +Risk 4 (mixin couples future divergence) is mitigated by the task +spec's documented escape hatch: future divergent strategies can +either override mixin methods or use the free functions in isolation. +The 3 current strategies' state machines are byte-equivalent +post-refactor, so the mixin is the right shape today. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 83961d1..c1e70b2 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -12,6 +12,6 @@ sub_step: retry_count: 0 cycle: 1 tracker: jira -last_completed_batch: 54 +last_completed_batch: 55 last_cumulative_review: batches_52-54 -current_batch: 55 +current_batch: 56 diff --git a/src/gps_denied_onboard/components/c1_vio/_facade_spine.py b/src/gps_denied_onboard/components/c1_vio/_facade_spine.py new file mode 100644 index 0000000..22153e4 --- /dev/null +++ b/src/gps_denied_onboard/components/c1_vio/_facade_spine.py @@ -0,0 +1,224 @@ +"""c1_vio strategy facade orchestration spine — c1-internal helpers (AZ-528). + +Shared between :class:`Okvis2Strategy` (AZ-332), +:class:`VinsMonoStrategy` (AZ-333), and :class:`KltRansacStrategy` +(AZ-334). Closes cumulative review batches 52-54 Finding F1: the +orchestration-spine duplication across the c1_vio strategy triplet. + +Mirrors the AZ-527 precedent for the c2_vpr-side consolidation — +underscore-prefixed module name keeps the helper c1-internal (NOT in +``c1_vio/__init__.py``'s Public API surface); concrete strategies +import it as a sibling. Engine output-shape contracts and VIO state +machines are component-scoped concerns, NOT cross-component +``shared/helpers/`` concerns. + +The helper module exposes: + +Free functions (stateless, pure): + +- :func:`now_iso` — ISO-8601 UTC timestamp for FDR record ``ts``. +- :func:`bias_norm` — L2 norm of the concatenated ``(accel || gyro)`` + 6-vector. +- :func:`se3_from_4x4` — lazy ``gtsam.Pose3`` construction from a + 4x4 row-major matrix. +- :func:`frame_ts_ns` — UTC-epoch nanoseconds from + :class:`NavCameraFrame` (OKVIS2 + VINS-Mono only; KLT/RANSAC owns + inline grayscale conversion via its geometry pipeline). +- :func:`frame_image` — contiguous ``uint8`` ndarray with 2D/3D + shape validation. ``producer_id`` is parametrised so the error + message names the originating strategy. + +Mixin (:class:`FacadeSpine`) provides the state-machine + FDR +``vio.health`` record emit. Concrete strategies inherit from it +and set the per-instance attributes the mixin reads: + +- ``_reported_state``, ``_frames_since_warmup``, + ``_warm_start_max_frames``, ``_feature_threshold`` — used by + :meth:`FacadeSpine._classify_state`. +- ``_consecutive_lost``, ``_lost_frame_threshold`` — used by + :meth:`FacadeSpine._tick_lost`. +- ``_last_emitted_state``, ``_producer_id``, ``_strategy_label``, + ``_latest_bias``, ``_fdr`` — used by + :meth:`FacadeSpine._emit_transition`. + +The mixin's required attributes are declared as class-level type +annotations (no default values) so type-checkers / IDEs catch a +strategy that forgets to set one in ``__init__`` — runtime safety +is the AZ-528 AC-9 + AC-10 regression-guard tests that exercise +the three concrete strategies end-to-end through the consolidated +spine. +""" + +from __future__ import annotations + +import math +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Any + +import numpy as np + +from gps_denied_onboard._types.nav import ( + FeatureQuality, + ImuBias, + VioState, +) +from gps_denied_onboard.components.c1_vio.errors import VioFatalError +from gps_denied_onboard.fdr_client.records import CURRENT_SCHEMA_VERSION, FdrRecord + +if TYPE_CHECKING: + import numpy.typing as npt + + from gps_denied_onboard._types.nav import NavCameraFrame + from gps_denied_onboard.fdr_client.client import FdrClient + + +__all__ = [ + "FacadeSpine", + "bias_norm", + "frame_image", + "frame_ts_ns", + "now_iso", + "se3_from_4x4", +] + + +def now_iso() -> str: + """ISO-8601 UTC timestamp for FDR record ``ts``. + + Returns the format used by all three c1_vio strategies' FDR + record emissions (``+00:00`` offset suffix). This is NOT the + ``Z``-suffix canonical form used by + :func:`gps_denied_onboard.helpers.iso_timestamps.iso_ts_from_clock` + (AZ-526) — that helper serves clock-injected FDR timestamps + from a :class:`~gps_denied_onboard.clock.Clock`, while this + helper is the strategy-facade's own wall-clock stamp at state- + transition emit time. + """ + return datetime.now(timezone.utc).isoformat() + + +def bias_norm(bias: ImuBias) -> float: + """L2 norm of the concatenated ``(accel_bias || gyro_bias)`` 6-vector.""" + accel = np.asarray(bias.accel_bias, dtype=np.float64) + gyro = np.asarray(bias.gyro_bias, dtype=np.float64) + return float(math.sqrt(float(np.dot(accel, accel) + np.dot(gyro, gyro)))) + + +def se3_from_4x4(matrix: npt.NDArray[Any]) -> Any: + """Build a ``gtsam.Pose3`` from a 4x4 row-major matrix. + + Imported lazily so this module can be imported without gtsam in + headless tooling paths (tests + facade-only smoke). + """ + import gtsam + + return gtsam.Pose3(np.asarray(matrix, dtype=np.float64)) + + +def frame_ts_ns(frame: NavCameraFrame) -> int: + """Convert :attr:`NavCameraFrame.timestamp` to monotonic ns. + + Uses the datetime's UTC epoch nanoseconds so the value is + monotonically increasing across frames (frame source guarantees + strictly increasing timestamps per the FrameSource contract). + """ + return int(frame.timestamp.timestamp() * 1e9) + + +def frame_image(frame: NavCameraFrame, *, producer_id: str) -> np.ndarray: + """Coerce the frame's image into a contiguous ``uint8`` ndarray. + + Raises :class:`VioFatalError` tagged with ``producer_id`` when + the frame's image is not 2-D or 3-D. + """ + arr = np.ascontiguousarray(frame.image, dtype=np.uint8) + if arr.ndim < 2 or arr.ndim > 3: + raise VioFatalError( + f"{producer_id}: NavCameraFrame.image must be 2-D or 3-D; got {arr.ndim}-D" + ) + return arr + + +class FacadeSpine: + """Orchestration-spine mixin for c1_vio :class:`VioStrategy` implementations. + + Implements three state-machine helpers that were previously duplicated + across :class:`Okvis2Strategy`, :class:`VinsMonoStrategy`, and + :class:`KltRansacStrategy`: + + - :meth:`_classify_state` — INIT during warmup, DEGRADED below + the per-strategy feature threshold, TRACKING otherwise. + - :meth:`_tick_lost` — increment the consecutive-lost counter, + escalate to LOST at ``_lost_frame_threshold``, demote + TRACKING → DEGRADED on the first lost frame. + - :meth:`_emit_transition` — emit exactly one FDR ``vio.health`` + record per state change; no record on steady-state. + + Concrete strategies MUST set the following attributes (typically + in ``__init__``) before any of the mixin methods are called: + + - ``_reported_state: VioState`` + - ``_frames_since_warmup: int`` + - ``_warm_start_max_frames: int`` + - ``_feature_threshold: int`` + - ``_consecutive_lost: int`` + - ``_lost_frame_threshold: int`` + - ``_last_emitted_state: VioState | None`` + - ``_producer_id: str`` — FDR record ``producer_id`` field + (e.g. ``"c1_vio.okvis2"``). + - ``_strategy_label: str`` — FDR payload ``strategy_label`` + field (e.g. ``"okvis2"``). + - ``_latest_bias: ImuBias`` + - ``_fdr: FdrClient`` + + Type annotations on the class declare the attributes for static + type-checkers; missing assignments at runtime surface as + ``AttributeError`` at the first state-machine call site. + """ + + _reported_state: VioState + _frames_since_warmup: int + _warm_start_max_frames: int + _feature_threshold: int + _consecutive_lost: int + _lost_frame_threshold: int + _last_emitted_state: VioState | None + _producer_id: str + _strategy_label: str + _latest_bias: ImuBias + _fdr: FdrClient + + def _classify_state(self, fq: FeatureQuality) -> VioState: + if self._reported_state == VioState.INIT and ( + self._frames_since_warmup + 1 < self._warm_start_max_frames + ): + return VioState.INIT + if fq.tracked < self._feature_threshold: + return VioState.DEGRADED + return VioState.TRACKING + + def _tick_lost(self, frame_id: str) -> None: + self._consecutive_lost += 1 + if self._consecutive_lost >= self._lost_frame_threshold: + self._reported_state = VioState.LOST + elif self._reported_state == VioState.TRACKING: + self._reported_state = VioState.DEGRADED + + def _emit_transition(self, new_state: VioState, frame_id: str) -> None: + if self._last_emitted_state == new_state: + return + self._last_emitted_state = new_state + record = FdrRecord( + schema_version=CURRENT_SCHEMA_VERSION, + ts=now_iso(), + producer_id=self._producer_id, + kind="vio.health", + payload={ + "state": new_state.value, + "consecutive_lost": self._consecutive_lost, + "bias_norm": bias_norm(self._latest_bias), + "strategy_label": self._strategy_label, + "frame_id": frame_id, + }, + ) + self._fdr.enqueue(record) diff --git a/src/gps_denied_onboard/components/c1_vio/klt_ransac.py b/src/gps_denied_onboard/components/c1_vio/klt_ransac.py index ba5350f..8337a4c 100644 --- a/src/gps_denied_onboard/components/c1_vio/klt_ransac.py +++ b/src/gps_denied_onboard/components/c1_vio/klt_ransac.py @@ -74,8 +74,6 @@ Risk mitigations (see task spec for full text): from __future__ import annotations -import math -from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Final, Literal import cv2 @@ -89,11 +87,15 @@ from gps_denied_onboard._types.nav import ( VioState, ) from gps_denied_onboard.clock.wall_clock import WallClock +from gps_denied_onboard.components.c1_vio._facade_spine import ( + FacadeSpine, + bias_norm, + se3_from_4x4, +) from gps_denied_onboard.components.c1_vio.errors import ( VioFatalError, VioInitializingError, ) -from gps_denied_onboard.fdr_client.records import CURRENT_SCHEMA_VERSION, FdrRecord from gps_denied_onboard.helpers.imu_preintegrator import ( ImuPreintegrationError, ImuPreintegrator, @@ -133,28 +135,6 @@ _ESSENTIAL_MATRIX_DOF: Final[int] = 5 _INIT_STATE_COVARIANCE_SCALAR: Final[float] = 10.0 -def _now_iso() -> str: - return datetime.now(timezone.utc).isoformat() - - -def _bias_norm(bias: ImuBias) -> float: - """L2 norm of the concatenated 6-vector ``(accel || gyro)``.""" - accel = np.asarray(bias.accel_bias, dtype=np.float64) - gyro = np.asarray(bias.gyro_bias, dtype=np.float64) - return float(math.sqrt(float(np.dot(accel, accel) + np.dot(gyro, gyro)))) - - -def _se3_from_4x4(matrix: npt.NDArray[Any]) -> Any: - """Build a ``gtsam.Pose3`` from a 4x4 row-major matrix. - - Imported lazily so this module can be imported without gtsam in - headless tooling paths (tests + facade-only smoke). - """ - import gtsam - - return gtsam.Pose3(np.asarray(matrix, dtype=np.float64)) - - def _grayscale(image: npt.NDArray[Any]) -> npt.NDArray[Any]: """Coerce a NavCameraFrame image to OpenCV's expected 2D ``uint8``. @@ -204,7 +184,7 @@ def _intrinsics_3x3(calibration: CameraCalibration) -> np.ndarray: return K -class KltRansacStrategy: +class KltRansacStrategy(FacadeSpine): """Mandatory simple-baseline :class:`VioStrategy` for E-C1 (AZ-334). Constructor matches the AZ-331 composition-root factory shape:: @@ -244,6 +224,9 @@ class KltRansacStrategy: self._lost_frame_threshold: int = c1_block.lost_frame_threshold self._warm_start_max_frames: int = c1_block.warm_start_max_frames self._cfg: KltRansacConfig = c1_block.klt_ransac + self._feature_threshold: int = self._cfg.min_features_for_pose + self._producer_id: str = _PRODUCER_ID + self._strategy_label: str = _STRATEGY_LABEL # Per-frame state. self._prev_gray: np.ndarray | None = None @@ -404,7 +387,7 @@ class KltRansacStrategy: pose_4x4 = np.eye(4, dtype=np.float64) pose_4x4[:3, :3] = np.asarray(R, dtype=np.float64) pose_4x4[:3, 3] = np.asarray(t, dtype=np.float64).flatten() - pose = _se3_from_4x4(pose_4x4) + pose = se3_from_4x4(pose_4x4) # 8. Final inlier count for state classification + covariance. final_inlier_count = ( @@ -529,7 +512,7 @@ class KltRansacStrategy: return VioHealth( state=self._reported_state, consecutive_lost=self._consecutive_lost, - bias_norm=_bias_norm(self._latest_bias), + bias_norm=bias_norm(self._latest_bias), ) def current_strategy_label(self) -> Literal["okvis2", "vins_mono", "klt_ransac"]: @@ -614,7 +597,7 @@ class KltRansacStrategy: as un-informed (the warm-start hint, not this VioOutput, is what seeds C5's initial estimate). """ - identity_pose = _se3_from_4x4(np.eye(4, dtype=np.float64)) + identity_pose = se3_from_4x4(np.eye(4, dtype=np.float64)) cov = np.eye(6, dtype=np.float64) * _INIT_STATE_COVARIANCE_SCALAR fq = FeatureQuality( tracked=int(self._prev_features.shape[0]) if self._prev_features is not None else 0, @@ -715,52 +698,6 @@ class KltRansacStrategy: scalar = (sigma_sq + inlier_penalty) / dof return np.eye(6, dtype=np.float64) * scalar - def _classify_state(self, fq: FeatureQuality) -> VioState: - """Map per-frame feature quality + warmup to a :class:`VioState`.""" - if self._reported_state == VioState.INIT and ( - self._frames_since_warmup + 1 < self._warm_start_max_frames - ): - return VioState.INIT - if fq.tracked < self._cfg.min_features_for_pose: - return VioState.DEGRADED - return VioState.TRACKING - - def _tick_lost(self, frame_id_str: str) -> None: - """Tick the lost-frame counter + transition state if needed. - - Mirrors :class:`Okvis2Strategy._tick_lost` exactly; the - post-AZ-334 hygiene PBI (Batch 53 review F1) will consolidate. - """ - self._consecutive_lost += 1 - if self._consecutive_lost >= self._lost_frame_threshold: - self._reported_state = VioState.LOST - elif self._reported_state == VioState.TRACKING: - self._reported_state = VioState.DEGRADED - - def _emit_transition(self, new_state: VioState, frame_id: str) -> None: - """Emit an FDR ``vio.health`` record IFF the state changed (AC-10). - - Steady-state frames produce no record — only transitions - through (INIT, TRACKING, DEGRADED, LOST) are FDR-stamped. - """ - if self._last_emitted_state == new_state: - return - self._last_emitted_state = new_state - record = FdrRecord( - schema_version=CURRENT_SCHEMA_VERSION, - ts=_now_iso(), - producer_id=_PRODUCER_ID, - kind="vio.health", - payload={ - "state": new_state.value, - "consecutive_lost": self._consecutive_lost, - "bias_norm": _bias_norm(self._latest_bias), - "strategy_label": _STRATEGY_LABEL, - "frame_id": frame_id, - }, - ) - self._fdr.enqueue(record) - def _safe_float(value: float) -> float: """NaN-safe float coercion — RansacFilter returns NaN for empty inliers.""" diff --git a/src/gps_denied_onboard/components/c1_vio/okvis2.py b/src/gps_denied_onboard/components/c1_vio/okvis2.py index 0a44d10..9676e23 100644 --- a/src/gps_denied_onboard/components/c1_vio/okvis2.py +++ b/src/gps_denied_onboard/components/c1_vio/okvis2.py @@ -44,8 +44,6 @@ AC mapping (see ``_docs/02_tasks/todo/AZ-332_c1_okvis2_strategy.md``): from __future__ import annotations -import math -from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Final, Literal import numpy as np @@ -58,16 +56,20 @@ from gps_denied_onboard._types.nav import ( VioState, ) from gps_denied_onboard.clock.wall_clock import WallClock +from gps_denied_onboard.components.c1_vio._facade_spine import ( + FacadeSpine, + bias_norm, + frame_image, + frame_ts_ns, + se3_from_4x4, +) from gps_denied_onboard.components.c1_vio.errors import ( VioFatalError, VioInitializingError, ) -from gps_denied_onboard.fdr_client.records import CURRENT_SCHEMA_VERSION, FdrRecord from gps_denied_onboard.logging import get_logger if TYPE_CHECKING: - import numpy.typing as npt - from gps_denied_onboard._types.calibration import CameraCalibration from gps_denied_onboard._types.nav import ( ImuWindow, @@ -85,32 +87,9 @@ __all__ = ["Okvis2Strategy"] _STRATEGY_LABEL: Final[Literal["okvis2"]] = "okvis2" _PRODUCER_ID: Final[str] = "c1_vio.okvis2" _LOGGER_COMPONENT: Final[str] = "c1_vio.okvis2" -_BIAS_NORM_FLOOR: Final[float] = 0.0 -def _now_iso() -> str: - return datetime.now(timezone.utc).isoformat() - - -def _bias_norm(bias: ImuBias) -> float: - """L2 norm of the concatenated 6-vector ``(accel || gyro)``.""" - accel = np.asarray(bias.accel_bias, dtype=np.float64) - gyro = np.asarray(bias.gyro_bias, dtype=np.float64) - return float(math.sqrt(float(np.dot(accel, accel) + np.dot(gyro, gyro)))) - - -def _se3_from_4x4(matrix: npt.NDArray[Any]) -> Any: - """Build a ``gtsam.Pose3`` from a 4x4 row-major matrix. - - Imported lazily so this module can be imported without gtsam in - headless tooling paths (tests + facade-only smoke). - """ - import gtsam - - return gtsam.Pose3(np.asarray(matrix, dtype=np.float64)) - - -class Okvis2Strategy: +class Okvis2Strategy(FacadeSpine): """Production-default :class:`VioStrategy` for E-C1 (AZ-332). Constructor matches the AZ-331 composition-root factory shape:: @@ -147,6 +126,9 @@ class Okvis2Strategy: self._lost_frame_threshold: int = c1_block.lost_frame_threshold self._warm_start_max_frames: int = c1_block.warm_start_max_frames self._okvis2_cfg: Okvis2Config = c1_block.okvis2 + self._feature_threshold: int = self._okvis2_cfg.degraded_feature_threshold + self._producer_id: str = _PRODUCER_ID + self._strategy_label: str = _STRATEGY_LABEL self._calibration: CameraCalibration | None = None self._frames_since_warmup: int = 0 self._consecutive_lost: int = 0 @@ -202,7 +184,9 @@ class Okvis2Strategy: try: self._push_imu_window(imu) produced = self._backend.add_frame( - frame_id_str, _frame_ts_ns(frame), _frame_image(frame) + frame_id_str, + frame_ts_ns(frame), + frame_image(frame, producer_id="Okvis2Strategy"), ) except self._binding_module.OkvisInitException as exc: self._emit_transition(VioState.INIT, frame_id_str) @@ -319,7 +303,7 @@ class Okvis2Strategy: return VioHealth( state=self._reported_state, consecutive_lost=self._consecutive_lost, - bias_norm=_bias_norm(self._latest_bias), + bias_norm=bias_norm(self._latest_bias), ) def current_strategy_label(self) -> Literal["okvis2", "vins_mono", "klt_ransac"]: @@ -400,7 +384,7 @@ class Okvis2Strategy: def _build_vio_output(self, raw: dict[str, Any], emitted_at_ns: int) -> VioOutput: try: - pose = _se3_from_4x4(raw["pose_T_world_body"]) + pose = se3_from_4x4(raw["pose_T_world_body"]) cov = np.asarray(raw["pose_covariance_6x6"], dtype=np.float64) bias = ImuBias( accel_bias=tuple(float(x) for x in raw["accel_bias"]), # type: ignore[arg-type] @@ -432,57 +416,3 @@ class Okvis2Strategy: emitted_at_ns=backend_ts, ) - def _classify_state(self, fq: FeatureQuality) -> VioState: - if self._reported_state == VioState.INIT and ( - self._frames_since_warmup + 1 < self._warm_start_max_frames - ): - return VioState.INIT - if fq.tracked < self._okvis2_cfg.degraded_feature_threshold: - return VioState.DEGRADED - return VioState.TRACKING - - def _tick_lost(self, frame_id: str) -> None: - self._consecutive_lost += 1 - if self._consecutive_lost >= self._lost_frame_threshold: - self._reported_state = VioState.LOST - elif self._reported_state == VioState.TRACKING: - self._reported_state = VioState.DEGRADED - - def _emit_transition(self, new_state: VioState, frame_id: str) -> None: - if self._last_emitted_state == new_state: - return - self._last_emitted_state = new_state - record = FdrRecord( - schema_version=CURRENT_SCHEMA_VERSION, - ts=_now_iso(), - producer_id=_PRODUCER_ID, - kind="vio.health", - payload={ - "state": new_state.value, - "consecutive_lost": self._consecutive_lost, - "bias_norm": _bias_norm(self._latest_bias), - "strategy_label": _STRATEGY_LABEL, - "frame_id": frame_id, - }, - ) - self._fdr.enqueue(record) - - -def _frame_ts_ns(frame: NavCameraFrame) -> int: - """Convert ``NavCameraFrame.timestamp`` to monotonic-ns. - - Uses the datetime's UTC epoch nanoseconds so the value is - monotonically increasing across frames (frame source guarantees - strictly increasing timestamps per the FrameSource contract). - """ - return int(frame.timestamp.timestamp() * 1e9) - - -def _frame_image(frame: NavCameraFrame) -> np.ndarray: - """Coerce the frame's image into a contiguous uint8 ndarray.""" - arr = np.ascontiguousarray(frame.image, dtype=np.uint8) - if arr.ndim < 2 or arr.ndim > 3: - raise VioFatalError( - f"Okvis2Strategy: NavCameraFrame.image must be 2-D or 3-D; got {arr.ndim}-D" - ) - return arr diff --git a/src/gps_denied_onboard/components/c1_vio/vins_mono.py b/src/gps_denied_onboard/components/c1_vio/vins_mono.py index d81bc28..7ad9a18 100644 --- a/src/gps_denied_onboard/components/c1_vio/vins_mono.py +++ b/src/gps_denied_onboard/components/c1_vio/vins_mono.py @@ -53,8 +53,6 @@ AC mapping (see ``_docs/02_tasks/todo/AZ-333_c1_vins_mono_strategy.md``): from __future__ import annotations -import math -from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Final, Literal import numpy as np @@ -67,16 +65,20 @@ from gps_denied_onboard._types.nav import ( VioState, ) from gps_denied_onboard.clock.wall_clock import WallClock +from gps_denied_onboard.components.c1_vio._facade_spine import ( + FacadeSpine, + bias_norm, + frame_image, + frame_ts_ns, + se3_from_4x4, +) from gps_denied_onboard.components.c1_vio.errors import ( VioFatalError, VioInitializingError, ) -from gps_denied_onboard.fdr_client.records import CURRENT_SCHEMA_VERSION, FdrRecord from gps_denied_onboard.logging import get_logger if TYPE_CHECKING: - import numpy.typing as npt - from gps_denied_onboard._types.calibration import CameraCalibration from gps_denied_onboard._types.nav import ( ImuWindow, @@ -96,29 +98,7 @@ _PRODUCER_ID: Final[str] = "c1_vio.vins_mono" _LOGGER_COMPONENT: Final[str] = "c1_vio.vins_mono" -def _now_iso() -> str: - return datetime.now(timezone.utc).isoformat() - - -def _bias_norm(bias: ImuBias) -> float: - """L2 norm of the concatenated 6-vector ``(accel || gyro)``.""" - accel = np.asarray(bias.accel_bias, dtype=np.float64) - gyro = np.asarray(bias.gyro_bias, dtype=np.float64) - return float(math.sqrt(float(np.dot(accel, accel) + np.dot(gyro, gyro)))) - - -def _se3_from_4x4(matrix: npt.NDArray[Any]) -> Any: - """Build a ``gtsam.Pose3`` from a 4x4 row-major matrix. - - Imported lazily so this module can be imported without gtsam in - headless tooling paths (tests + facade-only smoke). - """ - import gtsam - - return gtsam.Pose3(np.asarray(matrix, dtype=np.float64)) - - -class VinsMonoStrategy: +class VinsMonoStrategy(FacadeSpine): """Research-only :class:`VioStrategy` for IT-12 comparative study (AZ-333). Constructor matches the AZ-331 composition-root factory shape:: @@ -157,6 +137,9 @@ class VinsMonoStrategy: self._lost_frame_threshold: int = c1_block.lost_frame_threshold self._warm_start_max_frames: int = c1_block.warm_start_max_frames self._vins_cfg: VinsMonoConfig = c1_block.vins_mono + self._feature_threshold: int = self._vins_cfg.degraded_feature_threshold + self._producer_id: str = _PRODUCER_ID + self._strategy_label: str = _STRATEGY_LABEL self._calibration: CameraCalibration | None = None self._frames_since_warmup: int = 0 self._consecutive_lost: int = 0 @@ -216,7 +199,9 @@ class VinsMonoStrategy: try: self._push_imu_window(imu) produced = self._backend.add_frame( - frame_id_str, _frame_ts_ns(frame), _frame_image(frame) + frame_id_str, + frame_ts_ns(frame), + frame_image(frame, producer_id="VinsMonoStrategy"), ) except self._binding_module.VinsMonoInitException as exc: self._emit_transition(VioState.INIT, frame_id_str) @@ -338,7 +323,7 @@ class VinsMonoStrategy: return VioHealth( state=self._reported_state, consecutive_lost=self._consecutive_lost, - bias_norm=_bias_norm(self._latest_bias), + bias_norm=bias_norm(self._latest_bias), ) def current_strategy_label(self) -> Literal["okvis2", "vins_mono", "klt_ransac"]: @@ -426,7 +411,7 @@ class VinsMonoStrategy: def _build_vio_output(self, raw: dict[str, Any], emitted_at_ns: int) -> VioOutput: try: - pose = _se3_from_4x4(raw["pose_T_world_body"]) + pose = se3_from_4x4(raw["pose_T_world_body"]) cov = np.asarray(raw["pose_covariance_6x6"], dtype=np.float64) bias = ImuBias( accel_bias=tuple(float(x) for x in raw["accel_bias"]), # type: ignore[arg-type] @@ -461,58 +446,3 @@ class VinsMonoStrategy: emitted_at_ns=backend_ts, ) - def _classify_state(self, fq: FeatureQuality) -> VioState: - if self._reported_state == VioState.INIT and ( - self._frames_since_warmup + 1 < self._warm_start_max_frames - ): - return VioState.INIT - if fq.tracked < self._vins_cfg.degraded_feature_threshold: - return VioState.DEGRADED - return VioState.TRACKING - - def _tick_lost(self, frame_id: str) -> None: - self._consecutive_lost += 1 - if self._consecutive_lost >= self._lost_frame_threshold: - self._reported_state = VioState.LOST - elif self._reported_state == VioState.TRACKING: - self._reported_state = VioState.DEGRADED - - def _emit_transition(self, new_state: VioState, frame_id: str) -> None: - if self._last_emitted_state == new_state: - return - self._last_emitted_state = new_state - record = FdrRecord( - schema_version=CURRENT_SCHEMA_VERSION, - ts=_now_iso(), - producer_id=_PRODUCER_ID, - kind="vio.health", - payload={ - "state": new_state.value, - "consecutive_lost": self._consecutive_lost, - "bias_norm": _bias_norm(self._latest_bias), - "strategy_label": _STRATEGY_LABEL, - "frame_id": frame_id, - }, - ) - self._fdr.enqueue(record) - - -def _frame_ts_ns(frame: NavCameraFrame) -> int: - """Convert ``NavCameraFrame.timestamp`` to monotonic-ns. - - Uses the datetime's UTC epoch nanoseconds so the value is - monotonically increasing across frames (frame source guarantees - strictly increasing timestamps per the FrameSource contract). - """ - return int(frame.timestamp.timestamp() * 1e9) - - -def _frame_image(frame: NavCameraFrame) -> np.ndarray: - """Coerce the frame's image into a contiguous uint8 ndarray.""" - arr = np.ascontiguousarray(frame.image, dtype=np.uint8) - if arr.ndim < 2 or arr.ndim > 3: - raise VioFatalError( - f"VinsMonoStrategy: NavCameraFrame.image must be 2-D or 3-D; " - f"got {arr.ndim}-D" - ) - return arr diff --git a/tests/unit/c1_vio/test_az528_facade_spine.py b/tests/unit/c1_vio/test_az528_facade_spine.py new file mode 100644 index 0000000..1444c30 --- /dev/null +++ b/tests/unit/c1_vio/test_az528_facade_spine.py @@ -0,0 +1,457 @@ +"""AZ-528 — c1_vio facade orchestration-spine consolidation tests. + +Covers AC-1..AC-8 of +``_docs/02_tasks/todo/AZ-528_hygiene_c1_vio_facade_spine_consolidation.md``: + +- AC-1: helper module exposes the documented surface. +- AC-2: ``now_iso()`` returns an aware UTC ISO-8601 timestamp with + ``+00:00`` offset (NOT the ``Z``-suffix variant — that is + ``iso_ts_from_clock`` in AZ-526). +- AC-3: ``bias_norm`` matches the L2 formula on a hand-checked vector. +- AC-4: ``se3_from_4x4`` builds a ``gtsam.Pose3`` with the expected + identity rotation + zero translation when fed ``np.eye(4)``. +- AC-5: ``FacadeSpine._classify_state`` returns INIT during warm-up, + TRACKING above the threshold, DEGRADED below it. +- AC-6: ``FacadeSpine._tick_lost`` demotes TRACKING → DEGRADED on the + first lost frame and escalates to LOST at the threshold. +- AC-7: ``FacadeSpine._emit_transition`` emits exactly one + ``vio.health`` FDR record per state change (no record on + steady-state). +- AC-8: AST-walk regression guard — zero module-level definitions of + the consolidated free functions remain in any of the three strategy + modules. Mirrors the AZ-508 / AZ-526 / AZ-527 pattern. + +AC-9 (existing AZ-332 / AZ-333 / AZ-334 AC tests pass unmodified) and +AC-10 (AZ-270 layer lint) ride the existing test files; this module +does not re-stage them. +""" + +from __future__ import annotations + +import ast +from datetime import datetime +from pathlib import Path +from typing import Final + +import gtsam +import numpy as np +import pytest + +from gps_denied_onboard._types.nav import FeatureQuality, ImuBias, VioState +from gps_denied_onboard.components.c1_vio._facade_spine import ( + FacadeSpine, + bias_norm, + frame_image, + frame_ts_ns, + now_iso, + se3_from_4x4, +) +from gps_denied_onboard.fdr_client.fakes import FakeFdrSink + +_C1_VIO_SRC: Final[Path] = ( + Path(__file__).resolve().parents[3] + / "src" + / "gps_denied_onboard" + / "components" + / "c1_vio" +) +_STRATEGY_MODULES: Final[tuple[str, ...]] = ( + "okvis2.py", + "vins_mono.py", + "klt_ransac.py", +) +_FORBIDDEN_FREE_FUNCS: Final[frozenset[str]] = frozenset( + {"_now_iso", "_bias_norm", "_se3_from_4x4", "_frame_ts_ns", "_frame_image"} +) + + +# --------------------------------------------------------------------------- +# AC-1 — surface. + + +def test_ac1_helper_module_exposes_documented_surface() -> None: + # Assert + assert callable(now_iso) + assert callable(bias_norm) + assert callable(se3_from_4x4) + assert callable(frame_ts_ns) + assert callable(frame_image) + assert isinstance(FacadeSpine, type) + for method_name in ("_classify_state", "_tick_lost", "_emit_transition"): + assert callable(getattr(FacadeSpine, method_name)), method_name + + +# --------------------------------------------------------------------------- +# AC-2 — now_iso ISO-8601 UTC with +00:00 offset. + + +def test_ac2_now_iso_returns_aware_utc_with_plus_offset() -> None: + # Act + stamp = now_iso() + + # Assert + parsed = datetime.fromisoformat(stamp) + assert parsed.utcoffset() is not None, "expected aware datetime" + assert parsed.utcoffset().total_seconds() == 0.0 # type: ignore[union-attr] + assert stamp.endswith("+00:00"), ( + f"expected '+00:00' offset suffix, not 'Z'; got {stamp!r}" + ) + + +# --------------------------------------------------------------------------- +# AC-3 — bias_norm matches the L2 formula. + + +def test_ac3_bias_norm_matches_l2_formula() -> None: + # Arrange + bias = ImuBias(accel_bias=(1.0, 2.0, 2.0), gyro_bias=(0.0, 0.0, 0.0)) + + # Act + result = bias_norm(bias) + + # Assert + assert result == pytest.approx(3.0) + + +def test_ac3_bias_norm_includes_gyro_component() -> None: + # Arrange + bias = ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 4.0, 3.0)) + + # Act + result = bias_norm(bias) + + # Assert + assert result == pytest.approx(5.0) + + +# --------------------------------------------------------------------------- +# AC-4 — se3_from_4x4 returns gtsam.Pose3. + + +def test_ac4_se3_from_4x4_builds_identity_pose() -> None: + # Act + pose = se3_from_4x4(np.eye(4, dtype=np.float64)) + + # Assert + assert isinstance(pose, gtsam.Pose3) + translation = np.asarray(pose.translation()) + assert translation.shape == (3,) + assert np.allclose(translation, np.zeros(3)) + rotation = np.asarray(pose.rotation().matrix()) + assert np.allclose(rotation, np.eye(3)) + + +# --------------------------------------------------------------------------- +# Helpers for AC-5..AC-7 — minimal test-only mixin subclass that +# only sets the attributes the mixin reads. No native binding, no +# config DTO, no real fdr client wiring. + + +class _SpineHarness(FacadeSpine): + def __init__( + self, + *, + fdr: FakeFdrSink, + producer_id: str = "c1_vio.test", + strategy_label: str = "test_strategy", + warm_start_max_frames: int = 5, + feature_threshold: int = 50, + lost_frame_threshold: int = 3, + reported_state: VioState = VioState.INIT, + last_emitted_state: VioState | None = None, + frames_since_warmup: int = 0, + consecutive_lost: int = 0, + latest_bias: ImuBias | None = None, + ) -> None: + self._fdr = fdr # type: ignore[assignment] + self._producer_id = producer_id + self._strategy_label = strategy_label + self._warm_start_max_frames = warm_start_max_frames + self._feature_threshold = feature_threshold + self._lost_frame_threshold = lost_frame_threshold + self._reported_state = reported_state + self._last_emitted_state = last_emitted_state + self._frames_since_warmup = frames_since_warmup + self._consecutive_lost = consecutive_lost + self._latest_bias = latest_bias or ImuBias( + accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0) + ) + + +def _fq(tracked: int) -> FeatureQuality: + return FeatureQuality( + tracked=tracked, new=0, lost=0, mean_parallax=1.0, mre_px=0.5 + ) + + +def _fdr() -> FakeFdrSink: + return FakeFdrSink(producer_id="c1_vio.test", capacity=64) + + +# --------------------------------------------------------------------------- +# AC-5 — _classify_state mirrors the existing logic across all strategies. + + +def test_ac5_classify_state_returns_init_during_warmup() -> None: + # Arrange + spine = _SpineHarness( + fdr=_fdr(), + warm_start_max_frames=5, + feature_threshold=50, + frames_since_warmup=0, + reported_state=VioState.INIT, + ) + + # Act + state = spine._classify_state(_fq(tracked=80)) + + # Assert + assert state == VioState.INIT + + +def test_ac5_classify_state_returns_tracking_after_warmup() -> None: + # Arrange + spine = _SpineHarness( + fdr=_fdr(), + warm_start_max_frames=5, + feature_threshold=50, + frames_since_warmup=5, + reported_state=VioState.INIT, + ) + + # Act + state = spine._classify_state(_fq(tracked=80)) + + # Assert + assert state == VioState.TRACKING + + +def test_ac5_classify_state_returns_degraded_below_threshold() -> None: + # Arrange + spine = _SpineHarness( + fdr=_fdr(), + warm_start_max_frames=5, + feature_threshold=50, + frames_since_warmup=10, + reported_state=VioState.TRACKING, + ) + + # Act + state = spine._classify_state(_fq(tracked=10)) + + # Assert + assert state == VioState.DEGRADED + + +# --------------------------------------------------------------------------- +# AC-6 — _tick_lost transitions correctly. + + +def test_ac6_tick_lost_demotes_tracking_to_degraded_first_call() -> None: + # Arrange + spine = _SpineHarness( + fdr=_fdr(), + lost_frame_threshold=3, + reported_state=VioState.TRACKING, + consecutive_lost=0, + ) + + # Act + spine._tick_lost("frame_42") + + # Assert + assert spine._reported_state == VioState.DEGRADED + assert spine._consecutive_lost == 1 + + +def test_ac6_tick_lost_escalates_to_lost_at_threshold() -> None: + # Arrange + spine = _SpineHarness( + fdr=_fdr(), + lost_frame_threshold=3, + reported_state=VioState.TRACKING, + consecutive_lost=0, + ) + + # Act + spine._tick_lost("frame_42") + spine._tick_lost("frame_43") + spine._tick_lost("frame_44") + + # Assert + assert spine._reported_state == VioState.LOST + assert spine._consecutive_lost == 3 + + +# --------------------------------------------------------------------------- +# AC-7 — _emit_transition emits exactly one FDR record per state change. + + +def test_ac7_emit_transition_no_record_on_steady_state() -> None: + # Arrange + fdr = _fdr() + spine = _SpineHarness( + fdr=fdr, + reported_state=VioState.TRACKING, + last_emitted_state=VioState.TRACKING, + ) + + # Act + spine._emit_transition(VioState.TRACKING, "frame_42") + + # Assert + assert fdr.records == [] + assert spine._last_emitted_state == VioState.TRACKING + + +def test_ac7_emit_transition_one_record_per_state_change() -> None: + # Arrange + fdr = _fdr() + spine = _SpineHarness( + fdr=fdr, + producer_id="c1_vio.test", + strategy_label="test_strategy", + reported_state=VioState.TRACKING, + last_emitted_state=VioState.TRACKING, + consecutive_lost=2, + latest_bias=ImuBias(accel_bias=(1.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)), + ) + + # Act + spine._emit_transition(VioState.DEGRADED, "frame_42") + + # Assert + assert len(fdr.records) == 1 + record = fdr.records[0] + assert record.kind == "vio.health" + assert record.producer_id == "c1_vio.test" + assert set(record.payload.keys()) == { + "state", + "consecutive_lost", + "bias_norm", + "strategy_label", + "frame_id", + } + assert record.payload["state"] == VioState.DEGRADED.value + assert record.payload["consecutive_lost"] == 2 + assert record.payload["bias_norm"] == pytest.approx(1.0) + assert record.payload["strategy_label"] == "test_strategy" + assert record.payload["frame_id"] == "frame_42" + assert spine._last_emitted_state == VioState.DEGRADED + + +def test_ac7_emit_transition_idempotent_for_repeated_state() -> None: + # Arrange + fdr = _fdr() + spine = _SpineHarness( + fdr=fdr, + reported_state=VioState.TRACKING, + last_emitted_state=VioState.TRACKING, + ) + + # Act + spine._emit_transition(VioState.DEGRADED, "frame_42") + spine._emit_transition(VioState.DEGRADED, "frame_43") + spine._emit_transition(VioState.DEGRADED, "frame_44") + + # Assert + assert len(fdr.records) == 1 + + +# --------------------------------------------------------------------------- +# AC-8 — AST regression guard: no duplicated free-function definitions +# remain in any strategy module. Mirrors the AZ-508 / AZ-526 / AZ-527 +# precedent of AST-based source-asserts so a future strategy author +# cannot silently re-introduce a 4th local copy. + + +@pytest.mark.parametrize("strategy_module", _STRATEGY_MODULES) +def test_ac8_no_duplicated_free_functions_remain_in_strategy_module( + strategy_module: str, +) -> None: + # Arrange + src = (_C1_VIO_SRC / strategy_module).read_text(encoding="utf-8") + tree = ast.parse(src) + + # Act + offenders = sorted( + { + node.name + for node in tree.body + if isinstance(node, ast.FunctionDef) and node.name in _FORBIDDEN_FREE_FUNCS + } + ) + + # Assert + assert offenders == [], ( + f"{strategy_module} re-introduced consolidated free functions: " + f"{offenders}. They live in _facade_spine.py — import them from " + f"there instead of re-declaring." + ) + + +# --------------------------------------------------------------------------- +# Risk-1 mitigation — strategies set every attribute the mixin reads. +# AST-based check: each strategy's __init__ writes every required attr +# before any mixin method could be called externally. This is the +# spec's "verify all required attributes set after construction" +# pattern, executed statically so it does not require booting a fake +# native binding for the assertion. + + +_REQUIRED_SPINE_ATTRS: Final[frozenset[str]] = frozenset( + { + "_reported_state", + "_frames_since_warmup", + "_warm_start_max_frames", + "_feature_threshold", + "_consecutive_lost", + "_lost_frame_threshold", + "_last_emitted_state", + "_producer_id", + "_strategy_label", + "_latest_bias", + "_fdr", + } +) + + +@pytest.mark.parametrize("strategy_module", _STRATEGY_MODULES) +def test_strategy_init_sets_all_required_spine_attributes( + strategy_module: str, +) -> None: + # Arrange + src = (_C1_VIO_SRC / strategy_module).read_text(encoding="utf-8") + tree = ast.parse(src) + strategy_class = next( + node + for node in ast.walk(tree) + if isinstance(node, ast.ClassDef) and node.name.endswith("Strategy") + ) + init_method = next( + node + for node in strategy_class.body + if isinstance(node, ast.FunctionDef) and node.name == "__init__" + ) + + # Act + assigned_attrs: set[str] = set() + for stmt in ast.walk(init_method): + if not isinstance(stmt, (ast.Assign, ast.AnnAssign)): + continue + targets = stmt.targets if isinstance(stmt, ast.Assign) else [stmt.target] + for target in targets: + if ( + isinstance(target, ast.Attribute) + and isinstance(target.value, ast.Name) + and target.value.id == "self" + ): + assigned_attrs.add(target.attr) + + # Assert + missing = _REQUIRED_SPINE_ATTRS - assigned_attrs + assert not missing, ( + f"{strategy_module}.__init__ does not set {sorted(missing)}; " + f"the FacadeSpine mixin needs every one before any state-machine " + f"method runs." + )