From 9fb9e4a349171052cb3923e5600f0490cb3a2c97 Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Sun, 3 May 2026 19:10:10 +0300 Subject: [PATCH] [AZ-232] Add safety anchor state machine Co-authored-by: Cursor --- .../AZ-232_safety_anchor_state_machine.md | 0 .../batch_09_cycle1_report.md | 36 +++++ .../reviews/batch_09_review.md | 54 +++++++ _docs/_autodev_state.md | 2 +- src/safety_anchor_wrapper/__init__.py | 17 +++ src/safety_anchor_wrapper/interfaces.py | 144 +++++++++++++++++- src/safety_anchor_wrapper/types.py | 40 ++++- tests/unit/test_safety_anchor_wrapper.py | 102 +++++++++++++ 8 files changed, 388 insertions(+), 7 deletions(-) rename _docs/02_tasks/{todo => done}/AZ-232_safety_anchor_state_machine.md (100%) create mode 100644 _docs/03_implementation/batch_09_cycle1_report.md create mode 100644 _docs/03_implementation/reviews/batch_09_review.md create mode 100644 tests/unit/test_safety_anchor_wrapper.py diff --git a/_docs/02_tasks/todo/AZ-232_safety_anchor_state_machine.md b/_docs/02_tasks/done/AZ-232_safety_anchor_state_machine.md similarity index 100% rename from _docs/02_tasks/todo/AZ-232_safety_anchor_state_machine.md rename to _docs/02_tasks/done/AZ-232_safety_anchor_state_machine.md diff --git a/_docs/03_implementation/batch_09_cycle1_report.md b/_docs/03_implementation/batch_09_cycle1_report.md new file mode 100644 index 0000000..a1b946a --- /dev/null +++ b/_docs/03_implementation/batch_09_cycle1_report.md @@ -0,0 +1,36 @@ +# Batch Report + +**Batch**: 9 +**Tasks**: AZ-232_safety_anchor_state_machine +**Date**: 2026-05-03 + +## Task Results + +| Task | Status | Files Modified | Tests | AC Coverage | Issues | +|------|--------|----------------|-------|-------------|--------| +| AZ-232_safety_anchor_state_machine | Done | 4 files | Pass | 4/4 ACs covered | None | + +## AC Test Coverage: All covered + +| AC Ref | Coverage | +|--------|----------| +| AZ-232 AC-1 | `test_vio_state_updates_position_estimate_with_honest_covariance` verifies VIO updates emit source-labelled estimates with honest covariance. | +| AZ-232 AC-2 | `test_accepted_anchor_corrects_state_and_records_evidence` verifies accepted anchors promote `satellite_anchored` state and record evidence. | +| AZ-232 AC-3 | `test_blackout_degrades_then_reaches_no_fix_with_monotonic_covariance` verifies monotonic covariance growth and no-fix semantics. | +| AZ-232 AC-4 | `test_tile_write_eligibility_requires_trusted_low_covariance_pose` verifies conservative tile-write eligibility. | + +## Code Review Verdict: PASS + +Review report: `_docs/03_implementation/reviews/batch_09_review.md` + +## Auto-Fix Attempts: 0 + +## Stuck Agents: None + +## Verification + +- `.venv/bin/python -m black --check src tests e2e/replay` passed. +- `.venv/bin/python -m ruff check src tests e2e/replay` passed. +- `.venv/bin/python -m pytest` passed: 49 tests. + +## Next Batch: All tasks complete diff --git a/_docs/03_implementation/reviews/batch_09_review.md b/_docs/03_implementation/reviews/batch_09_review.md new file mode 100644 index 0000000..712b04e --- /dev/null +++ b/_docs/03_implementation/reviews/batch_09_review.md @@ -0,0 +1,54 @@ +# Code Review Report + +**Batch**: AZ-232_safety_anchor_state_machine +**Date**: 2026-05-03 +**Verdict**: PASS + +## Findings + +No findings. + +## Review Scope + +- Task spec: + - `_docs/02_tasks/todo/AZ-232_safety_anchor_state_machine.md` +- Changed files: + - `src/safety_anchor_wrapper/__init__.py` + - `src/safety_anchor_wrapper/interfaces.py` + - `src/safety_anchor_wrapper/types.py` + - `tests/unit/test_safety_anchor_wrapper.py` + +## Phase Notes + +### Spec Compliance + +- AZ-232 AC-1 is covered by `test_vio_state_updates_position_estimate_with_honest_covariance`. +- AZ-232 AC-2 is covered by `test_accepted_anchor_corrects_state_and_records_evidence`. +- AZ-232 AC-3 is covered by `test_blackout_degrades_then_reaches_no_fix_with_monotonic_covariance`. +- AZ-232 AC-4 is covered by `test_tile_write_eligibility_requires_trusted_low_covariance_pose`. + +### Code Quality + +The safety wrapper owns source-label, covariance, anchor-promotion, degraded-mode, and tile-eligibility decisions without reaching into VIO, Anchor Verification, MAVLink transport, or Tile Manager internals. + +### Security Quick-Scan + +No network calls, shell execution, dynamic code execution, hardcoded secrets, or credential logging were introduced. + +### Performance Scan + +State transitions are constant-time and operate on typed DTOs. No per-frame heavy retrieval or matching work was introduced. + +### Cross-Task Consistency + +The wrapper consumes `VioStatePacket` and `AnchorDecision` outputs from previous batches and emits shared `PositionEstimate` DTOs for MAVLink/GCS integration. + +### Architecture Compliance + +Imports respect `_docs/02_document/module-layout.md`: Safety And Anchor Wrapper imports shared contracts and does not call Tile Manager directly during anchor acceptance. + +## Verification + +- `.venv/bin/python -m black --check src tests e2e/replay` +- `.venv/bin/python -m ruff check src tests e2e/replay` +- `.venv/bin/python -m pytest` diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index ff38d26..77631bb 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -9,6 +9,6 @@ tracker: jira sub_step: phase: 1 name: batch-loop - detail: "batch 8: AZ-231_anchor_verification_matching" + detail: "batch 9: AZ-232_safety_anchor_state_machine" retry_count: 0 cycle: 1 diff --git a/src/safety_anchor_wrapper/__init__.py b/src/safety_anchor_wrapper/__init__.py index 38b17f1..4647c0a 100644 --- a/src/safety_anchor_wrapper/__init__.py +++ b/src/safety_anchor_wrapper/__init__.py @@ -1 +1,18 @@ """Safety and anchor wrapper component.""" + +from .interfaces import LocalizationStateMachine, SafetyAnchorStateMachine +from .types import ( + LocalizationSnapshot, + SafetyStateConfig, + TelemetryContext, + TileWriteEligibility, +) + +__all__ = [ + "LocalizationSnapshot", + "LocalizationStateMachine", + "SafetyAnchorStateMachine", + "SafetyStateConfig", + "TelemetryContext", + "TileWriteEligibility", +] diff --git a/src/safety_anchor_wrapper/interfaces.py b/src/safety_anchor_wrapper/interfaces.py index 8756467..4318d39 100644 --- a/src/safety_anchor_wrapper/interfaces.py +++ b/src/safety_anchor_wrapper/interfaces.py @@ -1,13 +1,151 @@ """Public localization state-machine interfaces.""" -from typing import Any, Protocol +from typing import Protocol + +from shared.contracts import AnchorDecision, PositionEstimate, VioStatePacket + +from .types import ( + LocalizationSnapshot, + SafetyStateConfig, + TelemetryContext, + TileWriteEligibility, +) class LocalizationStateMachine(Protocol): """Coordinates VIO propagation and anchor promotion decisions.""" - def update_vio(self, vio_state: Any) -> Any: + def update_vio( + self, vio_state: VioStatePacket, telemetry: TelemetryContext + ) -> LocalizationSnapshot: """Update the state machine with a VIO state packet.""" - def consider_anchor(self, anchor_decision: Any) -> Any: + def consider_anchor(self, anchor_decision: AnchorDecision) -> LocalizationSnapshot: """Evaluate a verified anchor decision.""" + + +class SafetyAnchorStateMachine: + """Owns authoritative source labels, covariance, and tile eligibility.""" + + def __init__(self, config: SafetyStateConfig | None = None) -> None: + self._config = config or SafetyStateConfig() + self._snapshot: LocalizationSnapshot | None = None + + @property + def snapshot(self) -> LocalizationSnapshot | None: + return self._snapshot + + def update_vio( + self, + vio_state: VioStatePacket, + telemetry: TelemetryContext, + ) -> LocalizationSnapshot: + covariance_m = self._covariance_from_vio(vio_state) + estimate = PositionEstimate( + timestamp_ns=vio_state.timestamp_ns, + latitude_deg=telemetry.latitude_hint_deg, + longitude_deg=telemetry.longitude_hint_deg, + altitude_m=telemetry.altitude_m, + covariance_semimajor_m=covariance_m, + source_label="vo_extrapolated", + fix_type=3, + horizontal_accuracy_m=covariance_m, + anchor_age_ms=0, + ) + self._snapshot = LocalizationSnapshot( + estimate=estimate, + mode="vo_extrapolated", + last_vio_state=vio_state, + ) + return self._snapshot + + def consider_anchor(self, anchor_decision: AnchorDecision) -> LocalizationSnapshot: + self._require_snapshot() + assert self._snapshot is not None + if not anchor_decision.accepted: + return self._snapshot + + pose = anchor_decision.estimated_pose or {} + covariance_m = max(anchor_decision.mean_reprojection_error_px, 0.5) + estimate = PositionEstimate( + timestamp_ns=self._snapshot.estimate.timestamp_ns, + latitude_deg=float(pose.get("latitude_deg", self._snapshot.estimate.latitude_deg)), + longitude_deg=float(pose.get("longitude_deg", self._snapshot.estimate.longitude_deg)), + altitude_m=float(pose.get("altitude_m", self._snapshot.estimate.altitude_m)), + covariance_semimajor_m=covariance_m, + source_label="satellite_anchored", + fix_type=3, + horizontal_accuracy_m=covariance_m, + anchor_age_ms=0, + ) + self._snapshot = LocalizationSnapshot( + estimate=estimate, + mode="satellite_anchored", + anchor_evidence=anchor_decision, + last_vio_state=self._snapshot.last_vio_state, + ) + return self._snapshot + + def propagate_blackout(self, timestamp_ns: int) -> LocalizationSnapshot: + self._require_snapshot() + assert self._snapshot is not None + previous = self._snapshot.estimate + covariance_m = previous.covariance_semimajor_m + self._config.dead_reckoning_growth_m + no_fix = covariance_m >= self._config.no_fix_covariance_threshold_m + source_label = "no_fix" if no_fix else "dead_reckoned" + fix_type = 0 if no_fix else 2 + estimate = PositionEstimate( + timestamp_ns=timestamp_ns, + latitude_deg=previous.latitude_deg, + longitude_deg=previous.longitude_deg, + altitude_m=previous.altitude_m, + covariance_semimajor_m=covariance_m, + source_label=source_label, + fix_type=fix_type, + horizontal_accuracy_m=max(covariance_m, 999.0 if no_fix else covariance_m), + anchor_age_ms=previous.anchor_age_ms + 1_000, + ) + self._snapshot = LocalizationSnapshot( + estimate=estimate, + mode=source_label, + anchor_evidence=self._snapshot.anchor_evidence, + last_vio_state=self._snapshot.last_vio_state, + ) + return self._snapshot + + def tile_write_eligibility(self) -> TileWriteEligibility: + self._require_snapshot() + assert self._snapshot is not None + estimate = self._snapshot.estimate + if estimate.source_label not in {"satellite_anchored", "vo_extrapolated"}: + return TileWriteEligibility( + eligible=False, + reason="untrusted_source_label", + estimate=estimate, + ) + if estimate.covariance_semimajor_m > self._config.tile_write_covariance_max_m: + return TileWriteEligibility( + eligible=False, + reason="covariance_too_high", + estimate=estimate, + ) + return TileWriteEligibility( + eligible=True, + reason="trusted_pose", + estimate=estimate, + ) + + def _covariance_from_vio(self, vio_state: VioStatePacket) -> float: + if not vio_state.covariance_hint: + return max( + self._config.vio_covariance_floor_m, + self._config.initial_covariance_m / max(vio_state.tracking_quality, 0.1), + ) + diagonal = [ + row[index] for index, row in enumerate(vio_state.covariance_hint) if index < len(row) + ] + return max(self._config.vio_covariance_floor_m, max(diagonal, default=0.0)) + + def _require_snapshot(self) -> None: + if self._snapshot is None: + raise RuntimeError("safety state requires a VIO update before this operation") diff --git a/src/safety_anchor_wrapper/types.py b/src/safety_anchor_wrapper/types.py index b28e98c..6c1627f 100644 --- a/src/safety_anchor_wrapper/types.py +++ b/src/safety_anchor_wrapper/types.py @@ -1,5 +1,39 @@ -"""Public safety wrapper type aliases.""" +"""Public safety wrapper models.""" -from typing import Any +from typing import Literal -PositionEstimateLike = Any +from pydantic import BaseModel, ConfigDict, Field, NonNegativeFloat, NonNegativeInt + +from shared.contracts import AnchorDecision, PositionEstimate, VioStatePacket + + +class SafetyWrapperModel(BaseModel): + model_config = ConfigDict(extra="forbid", frozen=True) + + +class TelemetryContext(SafetyWrapperModel): + timestamp_ns: NonNegativeInt + latitude_hint_deg: float = Field(ge=-90.0, le=90.0) + longitude_hint_deg: float = Field(ge=-180.0, le=180.0) + altitude_m: float + + +class SafetyStateConfig(SafetyWrapperModel): + initial_covariance_m: NonNegativeFloat = 2.0 + vio_covariance_floor_m: NonNegativeFloat = 1.0 + dead_reckoning_growth_m: NonNegativeFloat = 50.0 + no_fix_covariance_threshold_m: NonNegativeFloat = 500.0 + tile_write_covariance_max_m: NonNegativeFloat = 3.0 + + +class LocalizationSnapshot(SafetyWrapperModel): + estimate: PositionEstimate + mode: Literal["satellite_anchored", "vo_extrapolated", "dead_reckoned", "no_fix"] + anchor_evidence: AnchorDecision | None = None + last_vio_state: VioStatePacket | None = None + + +class TileWriteEligibility(SafetyWrapperModel): + eligible: bool + reason: str = Field(min_length=1) + estimate: PositionEstimate diff --git a/tests/unit/test_safety_anchor_wrapper.py b/tests/unit/test_safety_anchor_wrapper.py new file mode 100644 index 0000000..80300c6 --- /dev/null +++ b/tests/unit/test_safety_anchor_wrapper.py @@ -0,0 +1,102 @@ +from safety_anchor_wrapper import SafetyAnchorStateMachine, SafetyStateConfig, TelemetryContext +from shared.contracts import AnchorDecision, VioStatePacket + + +def _telemetry() -> TelemetryContext: + return TelemetryContext( + timestamp_ns=1_000_000, + latitude_hint_deg=49.1, + longitude_hint_deg=36.1, + altitude_m=120.0, + ) + + +def _vio_state(**overrides: object) -> VioStatePacket: + payload: dict[str, object] = { + "timestamp_ns": 1_000_000, + "relative_pose": {"x_m": 1.0, "y_m": 0.0, "z_m": 0.0}, + "velocity_mps": (12.0, 0.0, 0.0), + "tracking_quality": 0.9, + "covariance_hint": [[1.8, 0.0], [0.0, 1.8]], + } + payload.update(overrides) + return VioStatePacket.model_validate(payload) + + +def _accepted_anchor() -> AnchorDecision: + return AnchorDecision( + candidate_id="chunk-1", + accepted=True, + estimated_pose={"latitude_deg": 49.2, "longitude_deg": 36.2, "altitude_m": 121.0}, + inliers=48, + mean_reprojection_error_px=1.2, + ) + + +def test_vio_state_updates_position_estimate_with_honest_covariance() -> None: + # Arrange + machine = SafetyAnchorStateMachine() + + # Act + snapshot = machine.update_vio(_vio_state(), _telemetry()) + + # Assert + assert snapshot.estimate.source_label == "vo_extrapolated" + assert snapshot.estimate.latitude_deg == 49.1 + assert snapshot.estimate.covariance_semimajor_m == 1.8 + assert snapshot.estimate.horizontal_accuracy_m >= snapshot.estimate.covariance_semimajor_m + + +def test_accepted_anchor_corrects_state_and_records_evidence() -> None: + # Arrange + machine = SafetyAnchorStateMachine() + machine.update_vio(_vio_state(), _telemetry()) + + # Act + snapshot = machine.consider_anchor(_accepted_anchor()) + + # Assert + assert snapshot.mode == "satellite_anchored" + assert snapshot.estimate.latitude_deg == 49.2 + assert snapshot.anchor_evidence is not None + assert snapshot.anchor_evidence.candidate_id == "chunk-1" + + +def test_blackout_degrades_then_reaches_no_fix_with_monotonic_covariance() -> None: + # Arrange + machine = SafetyAnchorStateMachine( + SafetyStateConfig(dead_reckoning_growth_m=250.0, no_fix_covariance_threshold_m=500.0) + ) + machine.update_vio(_vio_state(covariance_hint=[[100.0]]), _telemetry()) + + # Act + degraded = machine.propagate_blackout(2_000_000) + no_fix = machine.propagate_blackout(3_000_000) + + # Assert + assert degraded.mode == "dead_reckoned" + assert degraded.estimate.covariance_semimajor_m == 350.0 + assert no_fix.mode == "no_fix" + assert no_fix.estimate.fix_type == 0 + assert no_fix.estimate.covariance_semimajor_m > degraded.estimate.covariance_semimajor_m + + +def test_tile_write_eligibility_requires_trusted_low_covariance_pose() -> None: + # Arrange + machine = SafetyAnchorStateMachine(SafetyStateConfig(tile_write_covariance_max_m=3.0)) + machine.update_vio(_vio_state(covariance_hint=[[4.0]]), _telemetry()) + + # Act + high_covariance = machine.tile_write_eligibility() + machine.consider_anchor(_accepted_anchor()) + anchored = machine.tile_write_eligibility() + machine.propagate_blackout(2_000_000) + blackout = machine.tile_write_eligibility() + + # Assert + assert high_covariance.eligible is False + assert high_covariance.reason == "covariance_too_high" + assert anchored.eligible is True + assert anchored.reason == "trusted_pose" + assert blackout.eligible is False + assert blackout.reason == "untrusted_source_label"