Files
Oleksandr Bezdieniezhnykh f12789ebf0 [AZ-528] Consolidate c1_vio strategy facade orchestration spine
Replace 3-way byte-equivalent orchestration-spine duplication across
okvis2.py / vins_mono.py / klt_ransac.py with a single c1-internal
helper at components/c1_vio/_facade_spine.py. Closes cumulative
review batches 52-54 Finding F1. No behaviour change — all existing
AZ-332 / AZ-333 / AZ-334 AC tests pass unmodified (114 c1_vio tests
green, 237 with adjacent regression suite).

The helper exposes 5 stateless free functions (now_iso, bias_norm,
se3_from_4x4, frame_ts_ns, frame_image) and a FacadeSpine mixin
class providing _classify_state / _tick_lost / _emit_transition.
Concrete strategies inherit the mixin and set spine-required
instance attributes in __init__. Mirrors the AZ-527 precedent for
c2_vpr-side _assert_engine_output_dim consolidation.

New test file test_az528_facade_spine.py covers AC-1..AC-8 with 19
tests, including an AST regression guard that prevents future
re-introduction of the consolidated free functions in any strategy
module, plus a Risk-1 static check that every strategy's __init__
assigns every spine-required attribute.

Archive AZ-528 task spec to done/, bump autodev state to batch 56.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-14 03:03:16 +03:00

458 lines
14 KiB
Python

"""AZ-528 — c1_vio facade orchestration-spine consolidation tests.
Covers AC-1..AC-8 of
``_docs/02_tasks/todo/AZ-528_hygiene_c1_vio_facade_spine_consolidation.md``:
- AC-1: helper module exposes the documented surface.
- AC-2: ``now_iso()`` returns an aware UTC ISO-8601 timestamp with
``+00:00`` offset (NOT the ``Z``-suffix variant — that is
``iso_ts_from_clock`` in AZ-526).
- AC-3: ``bias_norm`` matches the L2 formula on a hand-checked vector.
- AC-4: ``se3_from_4x4`` builds a ``gtsam.Pose3`` with the expected
identity rotation + zero translation when fed ``np.eye(4)``.
- AC-5: ``FacadeSpine._classify_state`` returns INIT during warm-up,
TRACKING above the threshold, DEGRADED below it.
- AC-6: ``FacadeSpine._tick_lost`` demotes TRACKING → DEGRADED on the
first lost frame and escalates to LOST at the threshold.
- AC-7: ``FacadeSpine._emit_transition`` emits exactly one
``vio.health`` FDR record per state change (no record on
steady-state).
- AC-8: AST-walk regression guard — zero module-level definitions of
the consolidated free functions remain in any of the three strategy
modules. Mirrors the AZ-508 / AZ-526 / AZ-527 pattern.
AC-9 (existing AZ-332 / AZ-333 / AZ-334 AC tests pass unmodified) and
AC-10 (AZ-270 layer lint) ride the existing test files; this module
does not re-stage them.
"""
from __future__ import annotations
import ast
from datetime import datetime
from pathlib import Path
from typing import Final
import gtsam
import numpy as np
import pytest
from gps_denied_onboard._types.nav import FeatureQuality, ImuBias, VioState
from gps_denied_onboard.components.c1_vio._facade_spine import (
FacadeSpine,
bias_norm,
frame_image,
frame_ts_ns,
now_iso,
se3_from_4x4,
)
from gps_denied_onboard.fdr_client.fakes import FakeFdrSink
_C1_VIO_SRC: Final[Path] = (
Path(__file__).resolve().parents[3]
/ "src"
/ "gps_denied_onboard"
/ "components"
/ "c1_vio"
)
_STRATEGY_MODULES: Final[tuple[str, ...]] = (
"okvis2.py",
"vins_mono.py",
"klt_ransac.py",
)
_FORBIDDEN_FREE_FUNCS: Final[frozenset[str]] = frozenset(
{"_now_iso", "_bias_norm", "_se3_from_4x4", "_frame_ts_ns", "_frame_image"}
)
# ---------------------------------------------------------------------------
# AC-1 — surface.
def test_ac1_helper_module_exposes_documented_surface() -> None:
# Assert
assert callable(now_iso)
assert callable(bias_norm)
assert callable(se3_from_4x4)
assert callable(frame_ts_ns)
assert callable(frame_image)
assert isinstance(FacadeSpine, type)
for method_name in ("_classify_state", "_tick_lost", "_emit_transition"):
assert callable(getattr(FacadeSpine, method_name)), method_name
# ---------------------------------------------------------------------------
# AC-2 — now_iso ISO-8601 UTC with +00:00 offset.
def test_ac2_now_iso_returns_aware_utc_with_plus_offset() -> None:
# Act
stamp = now_iso()
# Assert
parsed = datetime.fromisoformat(stamp)
assert parsed.utcoffset() is not None, "expected aware datetime"
assert parsed.utcoffset().total_seconds() == 0.0 # type: ignore[union-attr]
assert stamp.endswith("+00:00"), (
f"expected '+00:00' offset suffix, not 'Z'; got {stamp!r}"
)
# ---------------------------------------------------------------------------
# AC-3 — bias_norm matches the L2 formula.
def test_ac3_bias_norm_matches_l2_formula() -> None:
# Arrange
bias = ImuBias(accel_bias=(1.0, 2.0, 2.0), gyro_bias=(0.0, 0.0, 0.0))
# Act
result = bias_norm(bias)
# Assert
assert result == pytest.approx(3.0)
def test_ac3_bias_norm_includes_gyro_component() -> None:
# Arrange
bias = ImuBias(accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 4.0, 3.0))
# Act
result = bias_norm(bias)
# Assert
assert result == pytest.approx(5.0)
# ---------------------------------------------------------------------------
# AC-4 — se3_from_4x4 returns gtsam.Pose3.
def test_ac4_se3_from_4x4_builds_identity_pose() -> None:
# Act
pose = se3_from_4x4(np.eye(4, dtype=np.float64))
# Assert
assert isinstance(pose, gtsam.Pose3)
translation = np.asarray(pose.translation())
assert translation.shape == (3,)
assert np.allclose(translation, np.zeros(3))
rotation = np.asarray(pose.rotation().matrix())
assert np.allclose(rotation, np.eye(3))
# ---------------------------------------------------------------------------
# Helpers for AC-5..AC-7 — minimal test-only mixin subclass that
# only sets the attributes the mixin reads. No native binding, no
# config DTO, no real fdr client wiring.
class _SpineHarness(FacadeSpine):
def __init__(
self,
*,
fdr: FakeFdrSink,
producer_id: str = "c1_vio.test",
strategy_label: str = "test_strategy",
warm_start_max_frames: int = 5,
feature_threshold: int = 50,
lost_frame_threshold: int = 3,
reported_state: VioState = VioState.INIT,
last_emitted_state: VioState | None = None,
frames_since_warmup: int = 0,
consecutive_lost: int = 0,
latest_bias: ImuBias | None = None,
) -> None:
self._fdr = fdr # type: ignore[assignment]
self._producer_id = producer_id
self._strategy_label = strategy_label
self._warm_start_max_frames = warm_start_max_frames
self._feature_threshold = feature_threshold
self._lost_frame_threshold = lost_frame_threshold
self._reported_state = reported_state
self._last_emitted_state = last_emitted_state
self._frames_since_warmup = frames_since_warmup
self._consecutive_lost = consecutive_lost
self._latest_bias = latest_bias or ImuBias(
accel_bias=(0.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)
)
def _fq(tracked: int) -> FeatureQuality:
return FeatureQuality(
tracked=tracked, new=0, lost=0, mean_parallax=1.0, mre_px=0.5
)
def _fdr() -> FakeFdrSink:
return FakeFdrSink(producer_id="c1_vio.test", capacity=64)
# ---------------------------------------------------------------------------
# AC-5 — _classify_state mirrors the existing logic across all strategies.
def test_ac5_classify_state_returns_init_during_warmup() -> None:
# Arrange
spine = _SpineHarness(
fdr=_fdr(),
warm_start_max_frames=5,
feature_threshold=50,
frames_since_warmup=0,
reported_state=VioState.INIT,
)
# Act
state = spine._classify_state(_fq(tracked=80))
# Assert
assert state == VioState.INIT
def test_ac5_classify_state_returns_tracking_after_warmup() -> None:
# Arrange
spine = _SpineHarness(
fdr=_fdr(),
warm_start_max_frames=5,
feature_threshold=50,
frames_since_warmup=5,
reported_state=VioState.INIT,
)
# Act
state = spine._classify_state(_fq(tracked=80))
# Assert
assert state == VioState.TRACKING
def test_ac5_classify_state_returns_degraded_below_threshold() -> None:
# Arrange
spine = _SpineHarness(
fdr=_fdr(),
warm_start_max_frames=5,
feature_threshold=50,
frames_since_warmup=10,
reported_state=VioState.TRACKING,
)
# Act
state = spine._classify_state(_fq(tracked=10))
# Assert
assert state == VioState.DEGRADED
# ---------------------------------------------------------------------------
# AC-6 — _tick_lost transitions correctly.
def test_ac6_tick_lost_demotes_tracking_to_degraded_first_call() -> None:
# Arrange
spine = _SpineHarness(
fdr=_fdr(),
lost_frame_threshold=3,
reported_state=VioState.TRACKING,
consecutive_lost=0,
)
# Act
spine._tick_lost("frame_42")
# Assert
assert spine._reported_state == VioState.DEGRADED
assert spine._consecutive_lost == 1
def test_ac6_tick_lost_escalates_to_lost_at_threshold() -> None:
# Arrange
spine = _SpineHarness(
fdr=_fdr(),
lost_frame_threshold=3,
reported_state=VioState.TRACKING,
consecutive_lost=0,
)
# Act
spine._tick_lost("frame_42")
spine._tick_lost("frame_43")
spine._tick_lost("frame_44")
# Assert
assert spine._reported_state == VioState.LOST
assert spine._consecutive_lost == 3
# ---------------------------------------------------------------------------
# AC-7 — _emit_transition emits exactly one FDR record per state change.
def test_ac7_emit_transition_no_record_on_steady_state() -> None:
# Arrange
fdr = _fdr()
spine = _SpineHarness(
fdr=fdr,
reported_state=VioState.TRACKING,
last_emitted_state=VioState.TRACKING,
)
# Act
spine._emit_transition(VioState.TRACKING, "frame_42")
# Assert
assert fdr.records == []
assert spine._last_emitted_state == VioState.TRACKING
def test_ac7_emit_transition_one_record_per_state_change() -> None:
# Arrange
fdr = _fdr()
spine = _SpineHarness(
fdr=fdr,
producer_id="c1_vio.test",
strategy_label="test_strategy",
reported_state=VioState.TRACKING,
last_emitted_state=VioState.TRACKING,
consecutive_lost=2,
latest_bias=ImuBias(accel_bias=(1.0, 0.0, 0.0), gyro_bias=(0.0, 0.0, 0.0)),
)
# Act
spine._emit_transition(VioState.DEGRADED, "frame_42")
# Assert
assert len(fdr.records) == 1
record = fdr.records[0]
assert record.kind == "vio.health"
assert record.producer_id == "c1_vio.test"
assert set(record.payload.keys()) == {
"state",
"consecutive_lost",
"bias_norm",
"strategy_label",
"frame_id",
}
assert record.payload["state"] == VioState.DEGRADED.value
assert record.payload["consecutive_lost"] == 2
assert record.payload["bias_norm"] == pytest.approx(1.0)
assert record.payload["strategy_label"] == "test_strategy"
assert record.payload["frame_id"] == "frame_42"
assert spine._last_emitted_state == VioState.DEGRADED
def test_ac7_emit_transition_idempotent_for_repeated_state() -> None:
# Arrange
fdr = _fdr()
spine = _SpineHarness(
fdr=fdr,
reported_state=VioState.TRACKING,
last_emitted_state=VioState.TRACKING,
)
# Act
spine._emit_transition(VioState.DEGRADED, "frame_42")
spine._emit_transition(VioState.DEGRADED, "frame_43")
spine._emit_transition(VioState.DEGRADED, "frame_44")
# Assert
assert len(fdr.records) == 1
# ---------------------------------------------------------------------------
# AC-8 — AST regression guard: no duplicated free-function definitions
# remain in any strategy module. Mirrors the AZ-508 / AZ-526 / AZ-527
# precedent of AST-based source-asserts so a future strategy author
# cannot silently re-introduce a 4th local copy.
@pytest.mark.parametrize("strategy_module", _STRATEGY_MODULES)
def test_ac8_no_duplicated_free_functions_remain_in_strategy_module(
strategy_module: str,
) -> None:
# Arrange
src = (_C1_VIO_SRC / strategy_module).read_text(encoding="utf-8")
tree = ast.parse(src)
# Act
offenders = sorted(
{
node.name
for node in tree.body
if isinstance(node, ast.FunctionDef) and node.name in _FORBIDDEN_FREE_FUNCS
}
)
# Assert
assert offenders == [], (
f"{strategy_module} re-introduced consolidated free functions: "
f"{offenders}. They live in _facade_spine.py — import them from "
f"there instead of re-declaring."
)
# ---------------------------------------------------------------------------
# Risk-1 mitigation — strategies set every attribute the mixin reads.
# AST-based check: each strategy's __init__ writes every required attr
# before any mixin method could be called externally. This is the
# spec's "verify all required attributes set after construction"
# pattern, executed statically so it does not require booting a fake
# native binding for the assertion.
_REQUIRED_SPINE_ATTRS: Final[frozenset[str]] = frozenset(
{
"_reported_state",
"_frames_since_warmup",
"_warm_start_max_frames",
"_feature_threshold",
"_consecutive_lost",
"_lost_frame_threshold",
"_last_emitted_state",
"_producer_id",
"_strategy_label",
"_latest_bias",
"_fdr",
}
)
@pytest.mark.parametrize("strategy_module", _STRATEGY_MODULES)
def test_strategy_init_sets_all_required_spine_attributes(
strategy_module: str,
) -> None:
# Arrange
src = (_C1_VIO_SRC / strategy_module).read_text(encoding="utf-8")
tree = ast.parse(src)
strategy_class = next(
node
for node in ast.walk(tree)
if isinstance(node, ast.ClassDef) and node.name.endswith("Strategy")
)
init_method = next(
node
for node in strategy_class.body
if isinstance(node, ast.FunctionDef) and node.name == "__init__"
)
# Act
assigned_attrs: set[str] = set()
for stmt in ast.walk(init_method):
if not isinstance(stmt, (ast.Assign, ast.AnnAssign)):
continue
targets = stmt.targets if isinstance(stmt, ast.Assign) else [stmt.target]
for target in targets:
if (
isinstance(target, ast.Attribute)
and isinstance(target.value, ast.Name)
and target.value.id == "self"
):
assigned_attrs.add(target.attr)
# Assert
missing = _REQUIRED_SPINE_ATTRS - assigned_attrs
assert not missing, (
f"{strategy_module}.__init__ does not set {sorted(missing)}; "
f"the FacadeSpine mixin needs every one before any state-machine "
f"method runs."
)