[AZ-528] Consolidate c1_vio strategy facade orchestration spine

Replace 3-way byte-equivalent orchestration-spine duplication across
okvis2.py / vins_mono.py / klt_ransac.py with a single c1-internal
helper at components/c1_vio/_facade_spine.py. Closes cumulative
review batches 52-54 Finding F1. No behaviour change — all existing
AZ-332 / AZ-333 / AZ-334 AC tests pass unmodified (114 c1_vio tests
green, 237 with adjacent regression suite).

The helper exposes 5 stateless free functions (now_iso, bias_norm,
se3_from_4x4, frame_ts_ns, frame_image) and a FacadeSpine mixin
class providing _classify_state / _tick_lost / _emit_transition.
Concrete strategies inherit the mixin and set spine-required
instance attributes in __init__. Mirrors the AZ-527 precedent for
c2_vpr-side _assert_engine_output_dim consolidation.

New test file test_az528_facade_spine.py covers AC-1..AC-8 with 19
tests, including an AST regression guard that prevents future
re-introduction of the consolidated free functions in any strategy
module, plus a Risk-1 static check that every strategy's __init__
assigns every spine-required attribute.

Archive AZ-528 task spec to done/, bump autodev state to batch 56.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-14 03:03:16 +03:00
parent ac3e288dbd
commit f12789ebf0
9 changed files with 969 additions and 249 deletions
@@ -0,0 +1,97 @@
# Batch 55 — Cycle 1 Report
**Date**: 2026-05-14
**Tasks**: AZ-528 (Hygiene — c1_vio strategy facade orchestration-spine consolidation)
**Verdict**: COMPLETE — PASS
## Summary
Consolidated the 3-way orchestration-spine duplication across the
c1_vio `VioStrategy` modules (`okvis2.py`, `vins_mono.py`,
`klt_ransac.py`) into a single c1-internal helper module
`_facade_spine.py`. Closes cumulative review batches 52-54 Finding
F1 (Medium / Maintainability). Mirrors the AZ-527 precedent for the
c2_vpr-side `_assert_engine_output_dim` consolidation. No behaviour
change — every existing AZ-332 / AZ-333 / AZ-334 AC test passes
unmodified.
## Files added / modified
### Added (2)
- `src/gps_denied_onboard/components/c1_vio/_facade_spine.py` — new
c1-internal helper, 225 lines. Exports 5 free functions (`now_iso`,
`bias_norm`, `se3_from_4x4`, `frame_ts_ns`, `frame_image`) and the
`FacadeSpine` mixin class (`_classify_state`, `_tick_lost`,
`_emit_transition`).
- `tests/unit/c1_vio/test_az528_facade_spine.py` — new test file, 19
tests covering AC-1..AC-8 + a Risk-1 mitigation test (each
strategy's `__init__` statically asserts that every spine-required
attribute is assigned). All pass.
### Modified (3)
- `src/gps_denied_onboard/components/c1_vio/okvis2.py` — inherit from
`FacadeSpine`; import the 5 free functions; delete the 5 local
module-level definitions, 3 instance methods, and the unused
`_BIAS_NORM_FLOOR` constant. Set the 3 spine-required attributes
(`_feature_threshold`, `_producer_id`, `_strategy_label`) in
`__init__`.
- `src/gps_denied_onboard/components/c1_vio/vins_mono.py` — same
pattern as `okvis2.py`.
- `src/gps_denied_onboard/components/c1_vio/klt_ransac.py` — same
pattern; spine threshold reads `self._cfg.min_features_for_pose`
(the KLT-side equivalent of OKVIS2's `degraded_feature_threshold`).
Deferred-consolidation comments referencing this PBI removed.
## Tests
- `tests/unit/c1_vio/test_az528_facade_spine.py` — 19 new tests, all
pass.
- `tests/unit/c1_vio/` (full focused suite) — 120 collected, 114
pass + 6 tier-2 skipped (unchanged from pre-AZ-528 state).
- Adjacent regression suite (`test_az270_compose_root`,
`test_az272_fdr_record_schema`, `test_az273_fdr_client_ringbuf`,
`test_ac1_scaffold_layout`, `tests/unit/c1_vio/`) — 237 pass + 6
skipped.
## AC traceability
| AC | Status | Notes |
|-------|--------|---------------------------------------------------------|
| AC-1 | ✓ | Helper module exposes documented surface. |
| AC-2 | ✓ | `now_iso()` returns aware UTC `+00:00` ISO-8601. |
| AC-3 | ✓ | `bias_norm` matches L2 formula (accel + gyro). |
| AC-4 | ✓ | `se3_from_4x4` builds `gtsam.Pose3` identity correctly. |
| AC-5 | ✓ | `_classify_state` INIT during warmup, T/D thresholds. |
| AC-6 | ✓ | `_tick_lost` demotes T→D first call, escalates to LOST. |
| AC-7 | ✓ | `_emit_transition` exactly one FDR record per change. |
| AC-8 | ✓ | AST guard: 0 module-level free funcs in 3 strategies. |
| AC-9 | ✓ | All AZ-332 / AZ-333 / AZ-334 AC tests pass unmodified. |
| AC-10 | ✓ | AZ-270 layer-lint regression still passes. |
## Code review
See `_docs/03_implementation/reviews/batch_55_review.md` — verdict
PASS, no findings.
## Outcomes & lessons
- Pattern of an "AST regression guard against duplicated free
functions" continues to work well (AZ-508 / AZ-526 / AZ-527 /
AZ-528 all use it). It catches re-introduction by future authors
who don't know the consolidated home exists.
- Mixin-via-instance-attributes (rather than constructor parameters)
keeps strategy `__init__` signatures stable — the AZ-331 factory
signature `(config, *, fdr_client)` is preserved across all three
strategies post-refactor.
- A 3-point hygiene PBI was the right granularity: 1 new module,
3 small edits to existing strategies, 1 test file with AST + import
regression guards. Same shape and complexity as AZ-527.
## Outstanding
None for AZ-528. F2 from cumulative review batches 52-54 (test fake
+ `_patch_pose_recovery` helper consolidation) remains deferred —
sits at a different abstraction layer and will ride a later hygiene
pass.
@@ -0,0 +1,145 @@
# Code Review Report
**Batch**: 55 (AZ-528 — c1_vio strategy facade orchestration-spine consolidation)
**Date**: 2026-05-14
**Verdict**: PASS
## Scope
Single-task hygiene batch closing Finding F1 from
`_docs/03_implementation/cumulative_review_batches_52-54_cycle1_report.md`.
Replaces the 3-way byte-equivalent orchestration-spine duplication
across `okvis2.py`, `vins_mono.py`, `klt_ransac.py` with a single
c1-internal helper module `_facade_spine.py`. No behaviour change;
existing AZ-332 / AZ-333 / AZ-334 AC tests pass unmodified.
### Changed files
- `src/gps_denied_onboard/components/c1_vio/_facade_spine.py` — NEW
c1-internal helper. Exports 5 stateless free functions (`now_iso`,
`bias_norm`, `se3_from_4x4`, `frame_ts_ns`, `frame_image`) and one
mixin class (`FacadeSpine`) providing `_classify_state`,
`_tick_lost`, `_emit_transition`. 225 lines including docstring +
attribute declarations.
- `src/gps_denied_onboard/components/c1_vio/okvis2.py` — inherit from
`FacadeSpine`; import the 5 free functions; delete the 5 local
module-level definitions + 3 instance methods + unused
`_BIAS_NORM_FLOOR` constant; set 3 new spine-required attributes in
`__init__` (`_feature_threshold`, `_producer_id`, `_strategy_label`).
- `src/gps_denied_onboard/components/c1_vio/vins_mono.py` — same
pattern as `okvis2.py`.
- `src/gps_denied_onboard/components/c1_vio/klt_ransac.py` — same
pattern; reuses `_grayscale` (KLT/RANSAC-specific, not consolidated).
Spine threshold attribute reads `_cfg.min_features_for_pose` (the
KLT-side equivalent of OKVIS2's `degraded_feature_threshold`).
- `tests/unit/c1_vio/test_az528_facade_spine.py` — NEW. 19 tests
covering AC-1..AC-8 + a Risk-1 mitigation test that statically
verifies every concrete strategy's `__init__` sets every
spine-required attribute.
## Phase 2 — Spec Compliance
| AC | Test | Verified |
|-------|-----------------------------------------------------------------------------------------------|----------|
| AC-1 | `test_ac1_helper_module_exposes_documented_surface` | ✓ |
| AC-2 | `test_ac2_now_iso_returns_aware_utc_with_plus_offset` | ✓ |
| AC-3 | `test_ac3_bias_norm_matches_l2_formula` + `test_ac3_bias_norm_includes_gyro_component` | ✓ |
| AC-4 | `test_ac4_se3_from_4x4_builds_identity_pose` | ✓ |
| AC-5 | `test_ac5_classify_state_returns_init_during_warmup` + tracking + degraded | ✓ |
| AC-6 | `test_ac6_tick_lost_demotes_tracking_to_degraded_first_call` + escalates-to-lost | ✓ |
| AC-7 | `test_ac7_emit_transition_no_record_on_steady_state` + one-record-per-change + idempotent | ✓ |
| AC-8 | `test_ac8_no_duplicated_free_functions_remain_in_strategy_module` (parametrised over 3 files) | ✓ |
| AC-9 | Existing AZ-332 / AZ-333 / AZ-334 AC tests — all unmodified, all pass | ✓ |
| AC-10 | `tests/unit/test_az270_compose_root.py::test_ac6_only_compose_root_imports_concrete_strategies` | ✓ |
Test counts:
- `tests/unit/c1_vio/test_az528_facade_spine.py` — 19 tests, all pass.
- `tests/unit/c1_vio/` total — 120 collected, 114 pass + 6 tier-2
skipped (unchanged from pre-AZ-528 state).
- Adjacent regression suite (`test_az270_compose_root`, `test_az272_fdr_record_schema`,
`test_az273_fdr_client_ringbuf`, `test_ac1_scaffold_layout`,
`tests/unit/c1_vio/`) — 237 pass + 6 skipped.
Scope discipline: only the 5 files in the task spec's "Included"
list were touched; no test files outside the new
`test_az528_facade_spine.py` were modified (AC-9 hard requirement).
## Phase 3 — Code Quality
- **SRP**: `FacadeSpine` owns one responsibility — the c1_vio
strategy state machine + FDR `vio.health` record emission. Free
functions are stateless pure utilities. Each concrete strategy
retains its geometry pipeline + native-binding driver. Clean split.
- **Naming**: instance attributes the mixin reads are documented in
the class docstring; type annotations on the class declare them for
IDE / type-checker consumption (Risk 1 mitigation).
- **Comments**: deferred-consolidation comments referencing this PBI
(`# post-AZ-334 hygiene PBI ...`) were removed from
`klt_ransac.py` as required by the task spec.
- **Dead code**: removed `_BIAS_NORM_FLOOR` from `okvis2.py` (verified
via grep; not used anywhere in the c1_vio component or
cross-component).
- **DRY**: the consolidation eliminates ~120 lines of duplicated code
across the three strategy modules.
- **Test quality**: every test follows Arrange / Act / Assert; AC
tests use parametrisation where appropriate (AC-8 over the 3
strategy modules).
## Phase 4 — Security Quick-Scan
N/A — pure refactor, no new inputs, no I/O, no string interpolation
into queries, no subprocess invocation. The free functions take only
typed DTOs (`ImuBias`, `NavCameraFrame`, `np.ndarray`); the mixin
methods only mutate instance state.
## Phase 5 — Performance
No hot-path regression. The mixin call sites are byte-equivalent to
the previous in-line definitions (same conditionals, same
`FdrClient.enqueue` invocation, same string interpolation in
`bias_norm`). The single new indirection is method dispatch through
the mixin's MRO — Python class-method resolution is O(1) on a fixed
MRO and is amortised inside the hot-path's existing call overhead.
## Phase 6 — Cross-Task Consistency
Single-task batch — N/A.
## Phase 7 — Architecture Compliance
- **Layer direction**: `_facade_spine.py` imports only from L1
substrate (`_types.nav`, `components.c1_vio.errors`,
`fdr_client.records`) + stdlib (`datetime`, `math`) + numpy. No
imports from a higher layer; no imports from a sibling component.
- **Public API respect**: `_facade_spine` is underscore-prefixed and
NOT in `c1_vio/__init__.py`'s `__all__`. AZ-270's
`test_ac6_only_compose_root_imports_concrete_strategies`
regression-gate passes. Mirrors the AZ-527 precedent for
`_assert_engine_output_dim` in `c2_vpr`.
- **Cycles**: no new cyclic dependencies. `_facade_spine` is a leaf
inside `c1_vio`; the 3 strategy modules already imported from
`c1_vio.errors`, so the new sibling import slots in cleanly.
- **Duplicate symbols**: AC-8 AST regression guard asserts the 5
consolidated free-function names appear only in `_facade_spine.py`.
- **Cross-cutting concerns**: the spine is a c1-internal concern by
task-spec decision (component state machines + per-strategy FDR
record producers do not generalise across c2..c5; AZ-507 carved
that boundary intentionally). NOT hoisted to `shared/helpers/`.
## Findings
None.
## Conclusion
PASS. The consolidation removes ~120 lines of duplication, preserves
behaviour across all three strategies (verified by 237 passing tests
in the focused + adjacent suite), and respects the c1_vio component
boundary. The cumulative review batches 52-54 Finding F1 is closed.
Risk 4 (mixin couples future divergence) is mitigated by the task
spec's documented escape hatch: future divergent strategies can
either override mixin methods or use the free functions in isolation.
The 3 current strategies' state machines are byte-equivalent
post-refactor, so the mixin is the right shape today.
+2 -2
View File
@@ -12,6 +12,6 @@ sub_step:
retry_count: 0
cycle: 1
tracker: jira
last_completed_batch: 54
last_completed_batch: 55
last_cumulative_review: batches_52-54
current_batch: 55
current_batch: 56
@@ -0,0 +1,224 @@
"""c1_vio strategy facade orchestration spine — c1-internal helpers (AZ-528).
Shared between :class:`Okvis2Strategy` (AZ-332),
:class:`VinsMonoStrategy` (AZ-333), and :class:`KltRansacStrategy`
(AZ-334). Closes cumulative review batches 52-54 Finding F1: the
orchestration-spine duplication across the c1_vio strategy triplet.
Mirrors the AZ-527 precedent for the c2_vpr-side consolidation —
underscore-prefixed module name keeps the helper c1-internal (NOT in
``c1_vio/__init__.py``'s Public API surface); concrete strategies
import it as a sibling. Engine output-shape contracts and VIO state
machines are component-scoped concerns, NOT cross-component
``shared/helpers/`` concerns.
The helper module exposes:
Free functions (stateless, pure):
- :func:`now_iso` — ISO-8601 UTC timestamp for FDR record ``ts``.
- :func:`bias_norm` — L2 norm of the concatenated ``(accel || gyro)``
6-vector.
- :func:`se3_from_4x4` — lazy ``gtsam.Pose3`` construction from a
4x4 row-major matrix.
- :func:`frame_ts_ns` — UTC-epoch nanoseconds from
:class:`NavCameraFrame` (OKVIS2 + VINS-Mono only; KLT/RANSAC owns
inline grayscale conversion via its geometry pipeline).
- :func:`frame_image` — contiguous ``uint8`` ndarray with 2D/3D
shape validation. ``producer_id`` is parametrised so the error
message names the originating strategy.
Mixin (:class:`FacadeSpine`) provides the state-machine + FDR
``vio.health`` record emit. Concrete strategies inherit from it
and set the per-instance attributes the mixin reads:
- ``_reported_state``, ``_frames_since_warmup``,
``_warm_start_max_frames``, ``_feature_threshold`` — used by
:meth:`FacadeSpine._classify_state`.
- ``_consecutive_lost``, ``_lost_frame_threshold`` — used by
:meth:`FacadeSpine._tick_lost`.
- ``_last_emitted_state``, ``_producer_id``, ``_strategy_label``,
``_latest_bias``, ``_fdr`` — used by
:meth:`FacadeSpine._emit_transition`.
The mixin's required attributes are declared as class-level type
annotations (no default values) so type-checkers / IDEs catch a
strategy that forgets to set one in ``__init__`` — runtime safety
is the AZ-528 AC-9 + AC-10 regression-guard tests that exercise
the three concrete strategies end-to-end through the consolidated
spine.
"""
from __future__ import annotations
import math
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any
import numpy as np
from gps_denied_onboard._types.nav import (
FeatureQuality,
ImuBias,
VioState,
)
from gps_denied_onboard.components.c1_vio.errors import VioFatalError
from gps_denied_onboard.fdr_client.records import CURRENT_SCHEMA_VERSION, FdrRecord
if TYPE_CHECKING:
import numpy.typing as npt
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard.fdr_client.client import FdrClient
__all__ = [
"FacadeSpine",
"bias_norm",
"frame_image",
"frame_ts_ns",
"now_iso",
"se3_from_4x4",
]
def now_iso() -> str:
"""ISO-8601 UTC timestamp for FDR record ``ts``.
Returns the format used by all three c1_vio strategies' FDR
record emissions (``+00:00`` offset suffix). This is NOT the
``Z``-suffix canonical form used by
:func:`gps_denied_onboard.helpers.iso_timestamps.iso_ts_from_clock`
(AZ-526) — that helper serves clock-injected FDR timestamps
from a :class:`~gps_denied_onboard.clock.Clock`, while this
helper is the strategy-facade's own wall-clock stamp at state-
transition emit time.
"""
return datetime.now(timezone.utc).isoformat()
def bias_norm(bias: ImuBias) -> float:
"""L2 norm of the concatenated ``(accel_bias || gyro_bias)`` 6-vector."""
accel = np.asarray(bias.accel_bias, dtype=np.float64)
gyro = np.asarray(bias.gyro_bias, dtype=np.float64)
return float(math.sqrt(float(np.dot(accel, accel) + np.dot(gyro, gyro))))
def se3_from_4x4(matrix: npt.NDArray[Any]) -> Any:
"""Build a ``gtsam.Pose3`` from a 4x4 row-major matrix.
Imported lazily so this module can be imported without gtsam in
headless tooling paths (tests + facade-only smoke).
"""
import gtsam
return gtsam.Pose3(np.asarray(matrix, dtype=np.float64))
def frame_ts_ns(frame: NavCameraFrame) -> int:
"""Convert :attr:`NavCameraFrame.timestamp` to monotonic ns.
Uses the datetime's UTC epoch nanoseconds so the value is
monotonically increasing across frames (frame source guarantees
strictly increasing timestamps per the FrameSource contract).
"""
return int(frame.timestamp.timestamp() * 1e9)
def frame_image(frame: NavCameraFrame, *, producer_id: str) -> np.ndarray:
"""Coerce the frame's image into a contiguous ``uint8`` ndarray.
Raises :class:`VioFatalError` tagged with ``producer_id`` when
the frame's image is not 2-D or 3-D.
"""
arr = np.ascontiguousarray(frame.image, dtype=np.uint8)
if arr.ndim < 2 or arr.ndim > 3:
raise VioFatalError(
f"{producer_id}: NavCameraFrame.image must be 2-D or 3-D; got {arr.ndim}-D"
)
return arr
class FacadeSpine:
"""Orchestration-spine mixin for c1_vio :class:`VioStrategy` implementations.
Implements three state-machine helpers that were previously duplicated
across :class:`Okvis2Strategy`, :class:`VinsMonoStrategy`, and
:class:`KltRansacStrategy`:
- :meth:`_classify_state` — INIT during warmup, DEGRADED below
the per-strategy feature threshold, TRACKING otherwise.
- :meth:`_tick_lost` — increment the consecutive-lost counter,
escalate to LOST at ``_lost_frame_threshold``, demote
TRACKING → DEGRADED on the first lost frame.
- :meth:`_emit_transition` — emit exactly one FDR ``vio.health``
record per state change; no record on steady-state.
Concrete strategies MUST set the following attributes (typically
in ``__init__``) before any of the mixin methods are called:
- ``_reported_state: VioState``
- ``_frames_since_warmup: int``
- ``_warm_start_max_frames: int``
- ``_feature_threshold: int``
- ``_consecutive_lost: int``
- ``_lost_frame_threshold: int``
- ``_last_emitted_state: VioState | None``
- ``_producer_id: str`` — FDR record ``producer_id`` field
(e.g. ``"c1_vio.okvis2"``).
- ``_strategy_label: str`` — FDR payload ``strategy_label``
field (e.g. ``"okvis2"``).
- ``_latest_bias: ImuBias``
- ``_fdr: FdrClient``
Type annotations on the class declare the attributes for static
type-checkers; missing assignments at runtime surface as
``AttributeError`` at the first state-machine call site.
"""
_reported_state: VioState
_frames_since_warmup: int
_warm_start_max_frames: int
_feature_threshold: int
_consecutive_lost: int
_lost_frame_threshold: int
_last_emitted_state: VioState | None
_producer_id: str
_strategy_label: str
_latest_bias: ImuBias
_fdr: FdrClient
def _classify_state(self, fq: FeatureQuality) -> VioState:
if self._reported_state == VioState.INIT and (
self._frames_since_warmup + 1 < self._warm_start_max_frames
):
return VioState.INIT
if fq.tracked < self._feature_threshold:
return VioState.DEGRADED
return VioState.TRACKING
def _tick_lost(self, frame_id: str) -> None:
self._consecutive_lost += 1
if self._consecutive_lost >= self._lost_frame_threshold:
self._reported_state = VioState.LOST
elif self._reported_state == VioState.TRACKING:
self._reported_state = VioState.DEGRADED
def _emit_transition(self, new_state: VioState, frame_id: str) -> None:
if self._last_emitted_state == new_state:
return
self._last_emitted_state = new_state
record = FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=now_iso(),
producer_id=self._producer_id,
kind="vio.health",
payload={
"state": new_state.value,
"consecutive_lost": self._consecutive_lost,
"bias_norm": bias_norm(self._latest_bias),
"strategy_label": self._strategy_label,
"frame_id": frame_id,
},
)
self._fdr.enqueue(record)
@@ -74,8 +74,6 @@ Risk mitigations (see task spec for full text):
from __future__ import annotations
import math
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Final, Literal
import cv2
@@ -89,11 +87,15 @@ from gps_denied_onboard._types.nav import (
VioState,
)
from gps_denied_onboard.clock.wall_clock import WallClock
from gps_denied_onboard.components.c1_vio._facade_spine import (
FacadeSpine,
bias_norm,
se3_from_4x4,
)
from gps_denied_onboard.components.c1_vio.errors import (
VioFatalError,
VioInitializingError,
)
from gps_denied_onboard.fdr_client.records import CURRENT_SCHEMA_VERSION, FdrRecord
from gps_denied_onboard.helpers.imu_preintegrator import (
ImuPreintegrationError,
ImuPreintegrator,
@@ -133,28 +135,6 @@ _ESSENTIAL_MATRIX_DOF: Final[int] = 5
_INIT_STATE_COVARIANCE_SCALAR: Final[float] = 10.0
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _bias_norm(bias: ImuBias) -> float:
"""L2 norm of the concatenated 6-vector ``(accel || gyro)``."""
accel = np.asarray(bias.accel_bias, dtype=np.float64)
gyro = np.asarray(bias.gyro_bias, dtype=np.float64)
return float(math.sqrt(float(np.dot(accel, accel) + np.dot(gyro, gyro))))
def _se3_from_4x4(matrix: npt.NDArray[Any]) -> Any:
"""Build a ``gtsam.Pose3`` from a 4x4 row-major matrix.
Imported lazily so this module can be imported without gtsam in
headless tooling paths (tests + facade-only smoke).
"""
import gtsam
return gtsam.Pose3(np.asarray(matrix, dtype=np.float64))
def _grayscale(image: npt.NDArray[Any]) -> npt.NDArray[Any]:
"""Coerce a NavCameraFrame image to OpenCV's expected 2D ``uint8``.
@@ -204,7 +184,7 @@ def _intrinsics_3x3(calibration: CameraCalibration) -> np.ndarray:
return K
class KltRansacStrategy:
class KltRansacStrategy(FacadeSpine):
"""Mandatory simple-baseline :class:`VioStrategy` for E-C1 (AZ-334).
Constructor matches the AZ-331 composition-root factory shape::
@@ -244,6 +224,9 @@ class KltRansacStrategy:
self._lost_frame_threshold: int = c1_block.lost_frame_threshold
self._warm_start_max_frames: int = c1_block.warm_start_max_frames
self._cfg: KltRansacConfig = c1_block.klt_ransac
self._feature_threshold: int = self._cfg.min_features_for_pose
self._producer_id: str = _PRODUCER_ID
self._strategy_label: str = _STRATEGY_LABEL
# Per-frame state.
self._prev_gray: np.ndarray | None = None
@@ -404,7 +387,7 @@ class KltRansacStrategy:
pose_4x4 = np.eye(4, dtype=np.float64)
pose_4x4[:3, :3] = np.asarray(R, dtype=np.float64)
pose_4x4[:3, 3] = np.asarray(t, dtype=np.float64).flatten()
pose = _se3_from_4x4(pose_4x4)
pose = se3_from_4x4(pose_4x4)
# 8. Final inlier count for state classification + covariance.
final_inlier_count = (
@@ -529,7 +512,7 @@ class KltRansacStrategy:
return VioHealth(
state=self._reported_state,
consecutive_lost=self._consecutive_lost,
bias_norm=_bias_norm(self._latest_bias),
bias_norm=bias_norm(self._latest_bias),
)
def current_strategy_label(self) -> Literal["okvis2", "vins_mono", "klt_ransac"]:
@@ -614,7 +597,7 @@ class KltRansacStrategy:
as un-informed (the warm-start hint, not this VioOutput, is
what seeds C5's initial estimate).
"""
identity_pose = _se3_from_4x4(np.eye(4, dtype=np.float64))
identity_pose = se3_from_4x4(np.eye(4, dtype=np.float64))
cov = np.eye(6, dtype=np.float64) * _INIT_STATE_COVARIANCE_SCALAR
fq = FeatureQuality(
tracked=int(self._prev_features.shape[0]) if self._prev_features is not None else 0,
@@ -715,52 +698,6 @@ class KltRansacStrategy:
scalar = (sigma_sq + inlier_penalty) / dof
return np.eye(6, dtype=np.float64) * scalar
def _classify_state(self, fq: FeatureQuality) -> VioState:
"""Map per-frame feature quality + warmup to a :class:`VioState`."""
if self._reported_state == VioState.INIT and (
self._frames_since_warmup + 1 < self._warm_start_max_frames
):
return VioState.INIT
if fq.tracked < self._cfg.min_features_for_pose:
return VioState.DEGRADED
return VioState.TRACKING
def _tick_lost(self, frame_id_str: str) -> None:
"""Tick the lost-frame counter + transition state if needed.
Mirrors :class:`Okvis2Strategy._tick_lost` exactly; the
post-AZ-334 hygiene PBI (Batch 53 review F1) will consolidate.
"""
self._consecutive_lost += 1
if self._consecutive_lost >= self._lost_frame_threshold:
self._reported_state = VioState.LOST
elif self._reported_state == VioState.TRACKING:
self._reported_state = VioState.DEGRADED
def _emit_transition(self, new_state: VioState, frame_id: str) -> None:
"""Emit an FDR ``vio.health`` record IFF the state changed (AC-10).
Steady-state frames produce no record — only transitions
through (INIT, TRACKING, DEGRADED, LOST) are FDR-stamped.
"""
if self._last_emitted_state == new_state:
return
self._last_emitted_state = new_state
record = FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_now_iso(),
producer_id=_PRODUCER_ID,
kind="vio.health",
payload={
"state": new_state.value,
"consecutive_lost": self._consecutive_lost,
"bias_norm": _bias_norm(self._latest_bias),
"strategy_label": _STRATEGY_LABEL,
"frame_id": frame_id,
},
)
self._fdr.enqueue(record)
def _safe_float(value: float) -> float:
"""NaN-safe float coercion — RansacFilter returns NaN for empty inliers."""
@@ -44,8 +44,6 @@ AC mapping (see ``_docs/02_tasks/todo/AZ-332_c1_okvis2_strategy.md``):
from __future__ import annotations
import math
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Final, Literal
import numpy as np
@@ -58,16 +56,20 @@ from gps_denied_onboard._types.nav import (
VioState,
)
from gps_denied_onboard.clock.wall_clock import WallClock
from gps_denied_onboard.components.c1_vio._facade_spine import (
FacadeSpine,
bias_norm,
frame_image,
frame_ts_ns,
se3_from_4x4,
)
from gps_denied_onboard.components.c1_vio.errors import (
VioFatalError,
VioInitializingError,
)
from gps_denied_onboard.fdr_client.records import CURRENT_SCHEMA_VERSION, FdrRecord
from gps_denied_onboard.logging import get_logger
if TYPE_CHECKING:
import numpy.typing as npt
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import (
ImuWindow,
@@ -85,32 +87,9 @@ __all__ = ["Okvis2Strategy"]
_STRATEGY_LABEL: Final[Literal["okvis2"]] = "okvis2"
_PRODUCER_ID: Final[str] = "c1_vio.okvis2"
_LOGGER_COMPONENT: Final[str] = "c1_vio.okvis2"
_BIAS_NORM_FLOOR: Final[float] = 0.0
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _bias_norm(bias: ImuBias) -> float:
"""L2 norm of the concatenated 6-vector ``(accel || gyro)``."""
accel = np.asarray(bias.accel_bias, dtype=np.float64)
gyro = np.asarray(bias.gyro_bias, dtype=np.float64)
return float(math.sqrt(float(np.dot(accel, accel) + np.dot(gyro, gyro))))
def _se3_from_4x4(matrix: npt.NDArray[Any]) -> Any:
"""Build a ``gtsam.Pose3`` from a 4x4 row-major matrix.
Imported lazily so this module can be imported without gtsam in
headless tooling paths (tests + facade-only smoke).
"""
import gtsam
return gtsam.Pose3(np.asarray(matrix, dtype=np.float64))
class Okvis2Strategy:
class Okvis2Strategy(FacadeSpine):
"""Production-default :class:`VioStrategy` for E-C1 (AZ-332).
Constructor matches the AZ-331 composition-root factory shape::
@@ -147,6 +126,9 @@ class Okvis2Strategy:
self._lost_frame_threshold: int = c1_block.lost_frame_threshold
self._warm_start_max_frames: int = c1_block.warm_start_max_frames
self._okvis2_cfg: Okvis2Config = c1_block.okvis2
self._feature_threshold: int = self._okvis2_cfg.degraded_feature_threshold
self._producer_id: str = _PRODUCER_ID
self._strategy_label: str = _STRATEGY_LABEL
self._calibration: CameraCalibration | None = None
self._frames_since_warmup: int = 0
self._consecutive_lost: int = 0
@@ -202,7 +184,9 @@ class Okvis2Strategy:
try:
self._push_imu_window(imu)
produced = self._backend.add_frame(
frame_id_str, _frame_ts_ns(frame), _frame_image(frame)
frame_id_str,
frame_ts_ns(frame),
frame_image(frame, producer_id="Okvis2Strategy"),
)
except self._binding_module.OkvisInitException as exc:
self._emit_transition(VioState.INIT, frame_id_str)
@@ -319,7 +303,7 @@ class Okvis2Strategy:
return VioHealth(
state=self._reported_state,
consecutive_lost=self._consecutive_lost,
bias_norm=_bias_norm(self._latest_bias),
bias_norm=bias_norm(self._latest_bias),
)
def current_strategy_label(self) -> Literal["okvis2", "vins_mono", "klt_ransac"]:
@@ -400,7 +384,7 @@ class Okvis2Strategy:
def _build_vio_output(self, raw: dict[str, Any], emitted_at_ns: int) -> VioOutput:
try:
pose = _se3_from_4x4(raw["pose_T_world_body"])
pose = se3_from_4x4(raw["pose_T_world_body"])
cov = np.asarray(raw["pose_covariance_6x6"], dtype=np.float64)
bias = ImuBias(
accel_bias=tuple(float(x) for x in raw["accel_bias"]), # type: ignore[arg-type]
@@ -432,57 +416,3 @@ class Okvis2Strategy:
emitted_at_ns=backend_ts,
)
def _classify_state(self, fq: FeatureQuality) -> VioState:
if self._reported_state == VioState.INIT and (
self._frames_since_warmup + 1 < self._warm_start_max_frames
):
return VioState.INIT
if fq.tracked < self._okvis2_cfg.degraded_feature_threshold:
return VioState.DEGRADED
return VioState.TRACKING
def _tick_lost(self, frame_id: str) -> None:
self._consecutive_lost += 1
if self._consecutive_lost >= self._lost_frame_threshold:
self._reported_state = VioState.LOST
elif self._reported_state == VioState.TRACKING:
self._reported_state = VioState.DEGRADED
def _emit_transition(self, new_state: VioState, frame_id: str) -> None:
if self._last_emitted_state == new_state:
return
self._last_emitted_state = new_state
record = FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_now_iso(),
producer_id=_PRODUCER_ID,
kind="vio.health",
payload={
"state": new_state.value,
"consecutive_lost": self._consecutive_lost,
"bias_norm": _bias_norm(self._latest_bias),
"strategy_label": _STRATEGY_LABEL,
"frame_id": frame_id,
},
)
self._fdr.enqueue(record)
def _frame_ts_ns(frame: NavCameraFrame) -> int:
"""Convert ``NavCameraFrame.timestamp`` to monotonic-ns.
Uses the datetime's UTC epoch nanoseconds so the value is
monotonically increasing across frames (frame source guarantees
strictly increasing timestamps per the FrameSource contract).
"""
return int(frame.timestamp.timestamp() * 1e9)
def _frame_image(frame: NavCameraFrame) -> np.ndarray:
"""Coerce the frame's image into a contiguous uint8 ndarray."""
arr = np.ascontiguousarray(frame.image, dtype=np.uint8)
if arr.ndim < 2 or arr.ndim > 3:
raise VioFatalError(
f"Okvis2Strategy: NavCameraFrame.image must be 2-D or 3-D; got {arr.ndim}-D"
)
return arr
@@ -53,8 +53,6 @@ AC mapping (see ``_docs/02_tasks/todo/AZ-333_c1_vins_mono_strategy.md``):
from __future__ import annotations
import math
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Final, Literal
import numpy as np
@@ -67,16 +65,20 @@ from gps_denied_onboard._types.nav import (
VioState,
)
from gps_denied_onboard.clock.wall_clock import WallClock
from gps_denied_onboard.components.c1_vio._facade_spine import (
FacadeSpine,
bias_norm,
frame_image,
frame_ts_ns,
se3_from_4x4,
)
from gps_denied_onboard.components.c1_vio.errors import (
VioFatalError,
VioInitializingError,
)
from gps_denied_onboard.fdr_client.records import CURRENT_SCHEMA_VERSION, FdrRecord
from gps_denied_onboard.logging import get_logger
if TYPE_CHECKING:
import numpy.typing as npt
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import (
ImuWindow,
@@ -96,29 +98,7 @@ _PRODUCER_ID: Final[str] = "c1_vio.vins_mono"
_LOGGER_COMPONENT: Final[str] = "c1_vio.vins_mono"
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _bias_norm(bias: ImuBias) -> float:
"""L2 norm of the concatenated 6-vector ``(accel || gyro)``."""
accel = np.asarray(bias.accel_bias, dtype=np.float64)
gyro = np.asarray(bias.gyro_bias, dtype=np.float64)
return float(math.sqrt(float(np.dot(accel, accel) + np.dot(gyro, gyro))))
def _se3_from_4x4(matrix: npt.NDArray[Any]) -> Any:
"""Build a ``gtsam.Pose3`` from a 4x4 row-major matrix.
Imported lazily so this module can be imported without gtsam in
headless tooling paths (tests + facade-only smoke).
"""
import gtsam
return gtsam.Pose3(np.asarray(matrix, dtype=np.float64))
class VinsMonoStrategy:
class VinsMonoStrategy(FacadeSpine):
"""Research-only :class:`VioStrategy` for IT-12 comparative study (AZ-333).
Constructor matches the AZ-331 composition-root factory shape::
@@ -157,6 +137,9 @@ class VinsMonoStrategy:
self._lost_frame_threshold: int = c1_block.lost_frame_threshold
self._warm_start_max_frames: int = c1_block.warm_start_max_frames
self._vins_cfg: VinsMonoConfig = c1_block.vins_mono
self._feature_threshold: int = self._vins_cfg.degraded_feature_threshold
self._producer_id: str = _PRODUCER_ID
self._strategy_label: str = _STRATEGY_LABEL
self._calibration: CameraCalibration | None = None
self._frames_since_warmup: int = 0
self._consecutive_lost: int = 0
@@ -216,7 +199,9 @@ class VinsMonoStrategy:
try:
self._push_imu_window(imu)
produced = self._backend.add_frame(
frame_id_str, _frame_ts_ns(frame), _frame_image(frame)
frame_id_str,
frame_ts_ns(frame),
frame_image(frame, producer_id="VinsMonoStrategy"),
)
except self._binding_module.VinsMonoInitException as exc:
self._emit_transition(VioState.INIT, frame_id_str)
@@ -338,7 +323,7 @@ class VinsMonoStrategy:
return VioHealth(
state=self._reported_state,
consecutive_lost=self._consecutive_lost,
bias_norm=_bias_norm(self._latest_bias),
bias_norm=bias_norm(self._latest_bias),
)
def current_strategy_label(self) -> Literal["okvis2", "vins_mono", "klt_ransac"]:
@@ -426,7 +411,7 @@ class VinsMonoStrategy:
def _build_vio_output(self, raw: dict[str, Any], emitted_at_ns: int) -> VioOutput:
try:
pose = _se3_from_4x4(raw["pose_T_world_body"])
pose = se3_from_4x4(raw["pose_T_world_body"])
cov = np.asarray(raw["pose_covariance_6x6"], dtype=np.float64)
bias = ImuBias(
accel_bias=tuple(float(x) for x in raw["accel_bias"]), # type: ignore[arg-type]
@@ -461,58 +446,3 @@ class VinsMonoStrategy:
emitted_at_ns=backend_ts,
)
def _classify_state(self, fq: FeatureQuality) -> VioState:
if self._reported_state == VioState.INIT and (
self._frames_since_warmup + 1 < self._warm_start_max_frames
):
return VioState.INIT
if fq.tracked < self._vins_cfg.degraded_feature_threshold:
return VioState.DEGRADED
return VioState.TRACKING
def _tick_lost(self, frame_id: str) -> None:
self._consecutive_lost += 1
if self._consecutive_lost >= self._lost_frame_threshold:
self._reported_state = VioState.LOST
elif self._reported_state == VioState.TRACKING:
self._reported_state = VioState.DEGRADED
def _emit_transition(self, new_state: VioState, frame_id: str) -> None:
if self._last_emitted_state == new_state:
return
self._last_emitted_state = new_state
record = FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_now_iso(),
producer_id=_PRODUCER_ID,
kind="vio.health",
payload={
"state": new_state.value,
"consecutive_lost": self._consecutive_lost,
"bias_norm": _bias_norm(self._latest_bias),
"strategy_label": _STRATEGY_LABEL,
"frame_id": frame_id,
},
)
self._fdr.enqueue(record)
def _frame_ts_ns(frame: NavCameraFrame) -> int:
"""Convert ``NavCameraFrame.timestamp`` to monotonic-ns.
Uses the datetime's UTC epoch nanoseconds so the value is
monotonically increasing across frames (frame source guarantees
strictly increasing timestamps per the FrameSource contract).
"""
return int(frame.timestamp.timestamp() * 1e9)
def _frame_image(frame: NavCameraFrame) -> np.ndarray:
"""Coerce the frame's image into a contiguous uint8 ndarray."""
arr = np.ascontiguousarray(frame.image, dtype=np.uint8)
if arr.ndim < 2 or arr.ndim > 3:
raise VioFatalError(
f"VinsMonoStrategy: NavCameraFrame.image must be 2-D or 3-D; "
f"got {arr.ndim}-D"
)
return arr
@@ -0,0 +1,457 @@
"""AZ-528 — c1_vio facade orchestration-spine consolidation tests.
Covers AC-1..AC-8 of
``_docs/02_tasks/todo/AZ-528_hygiene_c1_vio_facade_spine_consolidation.md``:
- AC-1: helper module exposes the documented surface.
- AC-2: ``now_iso()`` returns an aware UTC ISO-8601 timestamp with
``+00:00`` offset (NOT the ``Z``-suffix variant — that is
``iso_ts_from_clock`` in AZ-526).
- AC-3: ``bias_norm`` matches the L2 formula on a hand-checked vector.
- AC-4: ``se3_from_4x4`` builds a ``gtsam.Pose3`` with the expected
identity rotation + zero translation when fed ``np.eye(4)``.
- AC-5: ``FacadeSpine._classify_state`` returns INIT during warm-up,
TRACKING above the threshold, DEGRADED below it.
- AC-6: ``FacadeSpine._tick_lost`` demotes TRACKING → DEGRADED on the
first lost frame and escalates to LOST at the threshold.
- AC-7: ``FacadeSpine._emit_transition`` emits exactly one
``vio.health`` FDR record per state change (no record on
steady-state).
- AC-8: AST-walk regression guard — zero module-level definitions of
the consolidated free functions remain in any of the three strategy
modules. Mirrors the AZ-508 / AZ-526 / AZ-527 pattern.
AC-9 (existing AZ-332 / AZ-333 / AZ-334 AC tests pass unmodified) and
AC-10 (AZ-270 layer lint) ride the existing test files; this module
does not re-stage them.
"""
from __future__ import annotations
import ast
from datetime import datetime
from pathlib import Path
from typing import Final
import gtsam
import numpy as np
import pytest
from gps_denied_onboard._types.nav import FeatureQuality, ImuBias, VioState
from gps_denied_onboard.components.c1_vio._facade_spine import (
FacadeSpine,
bias_norm,
frame_image,
frame_ts_ns,
now_iso,
se3_from_4x4,
)
from gps_denied_onboard.fdr_client.fakes import FakeFdrSink
_C1_VIO_SRC: Final[Path] = (
Path(__file__).resolve().parents[3]
/ "src"
/ "gps_denied_onboard"
/ "components"
/ "c1_vio"
)
_STRATEGY_MODULES: Final[tuple[str, ...]] = (
"okvis2.py",
"vins_mono.py",
"klt_ransac.py",
)
_FORBIDDEN_FREE_FUNCS: Final[frozenset[str]] = frozenset(
{"_now_iso", "_bias_norm", "_se3_from_4x4", "_frame_ts_ns", "_frame_image"}
)
# ---------------------------------------------------------------------------
# AC-1 — surface.
def test_ac1_helper_module_exposes_documented_surface() -> None:
# Assert
assert callable(now_iso)
assert callable(bias_norm)
assert callable(se3_from_4x4)
assert callable(frame_ts_ns)
assert callable(frame_image)
assert isinstance(FacadeSpine, type)
for method_name in ("_classify_state", "_tick_lost", "_emit_transition"):
assert callable(getattr(FacadeSpine, method_name)), method_name
# ---------------------------------------------------------------------------
# AC-2 — now_iso ISO-8601 UTC with +00:00 offset.
def test_ac2_now_iso_returns_aware_utc_with_plus_offset() -> None:
# Act
stamp = now_iso()
# Assert
parsed = datetime.fromisoformat(stamp)
assert parsed.utcoffset() is not None, "expected aware datetime"
assert parsed.utcoffset().total_seconds() == 0.0 # type: ignore[union-attr]
assert stamp.endswith("+00:00"), (
f"expected '+00:00' offset suffix, not 'Z'; got {stamp!r}"
)
# ---------------------------------------------------------------------------
# AC-3 — bias_norm matches the L2 formula.
def test_ac3_bias_norm_matches_l2_formula() -> None:
# Arrange
bias = ImuBias(accel_bias=(1.0, 2.0, 2.0), gyro_bias=(0.0, 0.0, 0.0))
# Act
result = bias_norm(bias)
# Assert
assert result == pytest.approx(3.0)
def test_ac3_bias_norm_includes_gyro_component() -> None:
# Arrange
bias = ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 4.0, 3.0))
# Act
result = bias_norm(bias)
# Assert
assert result == pytest.approx(5.0)
# ---------------------------------------------------------------------------
# AC-4 — se3_from_4x4 returns gtsam.Pose3.
def test_ac4_se3_from_4x4_builds_identity_pose() -> None:
# Act
pose = se3_from_4x4(np.eye(4, dtype=np.float64))
# Assert
assert isinstance(pose, gtsam.Pose3)
translation = np.asarray(pose.translation())
assert translation.shape == (3,)
assert np.allclose(translation, np.zeros(3))
rotation = np.asarray(pose.rotation().matrix())
assert np.allclose(rotation, np.eye(3))
# ---------------------------------------------------------------------------
# Helpers for AC-5..AC-7 — minimal test-only mixin subclass that
# only sets the attributes the mixin reads. No native binding, no
# config DTO, no real fdr client wiring.
class _SpineHarness(FacadeSpine):
def __init__(
self,
*,
fdr: FakeFdrSink,
producer_id: str = "c1_vio.test",
strategy_label: str = "test_strategy",
warm_start_max_frames: int = 5,
feature_threshold: int = 50,
lost_frame_threshold: int = 3,
reported_state: VioState = VioState.INIT,
last_emitted_state: VioState | None = None,
frames_since_warmup: int = 0,
consecutive_lost: int = 0,
latest_bias: ImuBias | None = None,
) -> None:
self._fdr = fdr # type: ignore[assignment]
self._producer_id = producer_id
self._strategy_label = strategy_label
self._warm_start_max_frames = warm_start_max_frames
self._feature_threshold = feature_threshold
self._lost_frame_threshold = lost_frame_threshold
self._reported_state = reported_state
self._last_emitted_state = last_emitted_state
self._frames_since_warmup = frames_since_warmup
self._consecutive_lost = consecutive_lost
self._latest_bias = latest_bias or ImuBias(
accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)
)
def _fq(tracked: int) -> FeatureQuality:
return FeatureQuality(
tracked=tracked, new=0, lost=0, mean_parallax=1.0, mre_px=0.5
)
def _fdr() -> FakeFdrSink:
return FakeFdrSink(producer_id="c1_vio.test", capacity=64)
# ---------------------------------------------------------------------------
# AC-5 — _classify_state mirrors the existing logic across all strategies.
def test_ac5_classify_state_returns_init_during_warmup() -> None:
# Arrange
spine = _SpineHarness(
fdr=_fdr(),
warm_start_max_frames=5,
feature_threshold=50,
frames_since_warmup=0,
reported_state=VioState.INIT,
)
# Act
state = spine._classify_state(_fq(tracked=80))
# Assert
assert state == VioState.INIT
def test_ac5_classify_state_returns_tracking_after_warmup() -> None:
# Arrange
spine = _SpineHarness(
fdr=_fdr(),
warm_start_max_frames=5,
feature_threshold=50,
frames_since_warmup=5,
reported_state=VioState.INIT,
)
# Act
state = spine._classify_state(_fq(tracked=80))
# Assert
assert state == VioState.TRACKING
def test_ac5_classify_state_returns_degraded_below_threshold() -> None:
# Arrange
spine = _SpineHarness(
fdr=_fdr(),
warm_start_max_frames=5,
feature_threshold=50,
frames_since_warmup=10,
reported_state=VioState.TRACKING,
)
# Act
state = spine._classify_state(_fq(tracked=10))
# Assert
assert state == VioState.DEGRADED
# ---------------------------------------------------------------------------
# AC-6 — _tick_lost transitions correctly.
def test_ac6_tick_lost_demotes_tracking_to_degraded_first_call() -> None:
# Arrange
spine = _SpineHarness(
fdr=_fdr(),
lost_frame_threshold=3,
reported_state=VioState.TRACKING,
consecutive_lost=0,
)
# Act
spine._tick_lost("frame_42")
# Assert
assert spine._reported_state == VioState.DEGRADED
assert spine._consecutive_lost == 1
def test_ac6_tick_lost_escalates_to_lost_at_threshold() -> None:
# Arrange
spine = _SpineHarness(
fdr=_fdr(),
lost_frame_threshold=3,
reported_state=VioState.TRACKING,
consecutive_lost=0,
)
# Act
spine._tick_lost("frame_42")
spine._tick_lost("frame_43")
spine._tick_lost("frame_44")
# Assert
assert spine._reported_state == VioState.LOST
assert spine._consecutive_lost == 3
# ---------------------------------------------------------------------------
# AC-7 — _emit_transition emits exactly one FDR record per state change.
def test_ac7_emit_transition_no_record_on_steady_state() -> None:
# Arrange
fdr = _fdr()
spine = _SpineHarness(
fdr=fdr,
reported_state=VioState.TRACKING,
last_emitted_state=VioState.TRACKING,
)
# Act
spine._emit_transition(VioState.TRACKING, "frame_42")
# Assert
assert fdr.records == []
assert spine._last_emitted_state == VioState.TRACKING
def test_ac7_emit_transition_one_record_per_state_change() -> None:
# Arrange
fdr = _fdr()
spine = _SpineHarness(
fdr=fdr,
producer_id="c1_vio.test",
strategy_label="test_strategy",
reported_state=VioState.TRACKING,
last_emitted_state=VioState.TRACKING,
consecutive_lost=2,
latest_bias=ImuBias(accel_bias=(1.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)),
)
# Act
spine._emit_transition(VioState.DEGRADED, "frame_42")
# Assert
assert len(fdr.records) == 1
record = fdr.records[0]
assert record.kind == "vio.health"
assert record.producer_id == "c1_vio.test"
assert set(record.payload.keys()) == {
"state",
"consecutive_lost",
"bias_norm",
"strategy_label",
"frame_id",
}
assert record.payload["state"] == VioState.DEGRADED.value
assert record.payload["consecutive_lost"] == 2
assert record.payload["bias_norm"] == pytest.approx(1.0)
assert record.payload["strategy_label"] == "test_strategy"
assert record.payload["frame_id"] == "frame_42"
assert spine._last_emitted_state == VioState.DEGRADED
def test_ac7_emit_transition_idempotent_for_repeated_state() -> None:
# Arrange
fdr = _fdr()
spine = _SpineHarness(
fdr=fdr,
reported_state=VioState.TRACKING,
last_emitted_state=VioState.TRACKING,
)
# Act
spine._emit_transition(VioState.DEGRADED, "frame_42")
spine._emit_transition(VioState.DEGRADED, "frame_43")
spine._emit_transition(VioState.DEGRADED, "frame_44")
# Assert
assert len(fdr.records) == 1
# ---------------------------------------------------------------------------
# AC-8 — AST regression guard: no duplicated free-function definitions
# remain in any strategy module. Mirrors the AZ-508 / AZ-526 / AZ-527
# precedent of AST-based source-asserts so a future strategy author
# cannot silently re-introduce a 4th local copy.
@pytest.mark.parametrize("strategy_module", _STRATEGY_MODULES)
def test_ac8_no_duplicated_free_functions_remain_in_strategy_module(
strategy_module: str,
) -> None:
# Arrange
src = (_C1_VIO_SRC / strategy_module).read_text(encoding="utf-8")
tree = ast.parse(src)
# Act
offenders = sorted(
{
node.name
for node in tree.body
if isinstance(node, ast.FunctionDef) and node.name in _FORBIDDEN_FREE_FUNCS
}
)
# Assert
assert offenders == [], (
f"{strategy_module} re-introduced consolidated free functions: "
f"{offenders}. They live in _facade_spine.py — import them from "
f"there instead of re-declaring."
)
# ---------------------------------------------------------------------------
# Risk-1 mitigation — strategies set every attribute the mixin reads.
# AST-based check: each strategy's __init__ writes every required attr
# before any mixin method could be called externally. This is the
# spec's "verify all required attributes set after construction"
# pattern, executed statically so it does not require booting a fake
# native binding for the assertion.
_REQUIRED_SPINE_ATTRS: Final[frozenset[str]] = frozenset(
{
"_reported_state",
"_frames_since_warmup",
"_warm_start_max_frames",
"_feature_threshold",
"_consecutive_lost",
"_lost_frame_threshold",
"_last_emitted_state",
"_producer_id",
"_strategy_label",
"_latest_bias",
"_fdr",
}
)
@pytest.mark.parametrize("strategy_module", _STRATEGY_MODULES)
def test_strategy_init_sets_all_required_spine_attributes(
strategy_module: str,
) -> None:
# Arrange
src = (_C1_VIO_SRC / strategy_module).read_text(encoding="utf-8")
tree = ast.parse(src)
strategy_class = next(
node
for node in ast.walk(tree)
if isinstance(node, ast.ClassDef) and node.name.endswith("Strategy")
)
init_method = next(
node
for node in strategy_class.body
if isinstance(node, ast.FunctionDef) and node.name == "__init__"
)
# Act
assigned_attrs: set[str] = set()
for stmt in ast.walk(init_method):
if not isinstance(stmt, (ast.Assign, ast.AnnAssign)):
continue
targets = stmt.targets if isinstance(stmt, ast.Assign) else [stmt.target]
for target in targets:
if (
isinstance(target, ast.Attribute)
and isinstance(target.value, ast.Name)
and target.value.id == "self"
):
assigned_attrs.add(target.attr)
# Assert
missing = _REQUIRED_SPINE_ATTRS - assigned_attrs
assert not missing, (
f"{strategy_module}.__init__ does not set {sorted(missing)}; "
f"the FacadeSpine mixin needs every one before any state-machine "
f"method runs."
)