mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 10:11:12 +00:00
feat(01-02): add Phase-3/4 stub Protocols (anchor_verifier, safety_state, flight_recorder)
- anchor_verifier.protocol: AnchorVerifier + VerifierDecision dataclass (Phase 3 VERIFY-01..05 fills semantics) - safety_state.protocol: SafetyAnchorStateMachine + SourceLabel enum (Phase 3 SAFE-01..06 fills implementation) - flight_recorder.protocol: FlightRecorder + RecorderHealth enum + FdrExportResult (Phase 4 FDR-01..06 fills) - Enum string values match REQUIREMENTS.md SAFE-01 / FDR-04 - Not registered in build_pipeline yet — Phase 1 only requires existence
This commit is contained in:
@@ -0,0 +1,37 @@
|
||||
"""Protocol surface for the anchor_verifier component (Phase 3, VERIFY-01..05).
|
||||
|
||||
Phase 1: stub only — semantics filled in Phase 3. The Protocol must
|
||||
exist now so the ARCH-01 directory inventory is complete at end of
|
||||
Phase 1.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Protocol, runtime_checkable
|
||||
|
||||
from gps_denied.hot_types.alignment_result import AlignmentResult
|
||||
from gps_denied.hot_types.satellite_anchor import SatelliteAnchor
|
||||
|
||||
|
||||
@dataclass(slots=True, frozen=True)
|
||||
class VerifierDecision:
|
||||
"""Result of an :meth:`AnchorVerifier.verify` call.
|
||||
|
||||
Phase 3 will refine the rejection-reason taxonomy (currently free-text:
|
||||
``too_few_inliers`` / ``mre_above_threshold`` / ``degenerate_homography``
|
||||
/ ``freshness_expired``).
|
||||
"""
|
||||
|
||||
accepted: bool
|
||||
anchor: SatelliteAnchor | None = None
|
||||
rejection_reason: str | None = None
|
||||
inlier_count: int = 0
|
||||
mean_reprojection_error_px: float = 0.0
|
||||
homography_condition_number: float = 0.0
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class AnchorVerifier(Protocol):
|
||||
"""Geometry-gated anchor verifier. Filled in Phase 3."""
|
||||
|
||||
def verify(self, candidate: AlignmentResult) -> VerifierDecision: ...
|
||||
@@ -0,0 +1,40 @@
|
||||
"""Protocol surface for the flight_recorder component (Phase 4, FDR-01..06).
|
||||
|
||||
Phase 1: stub only — Phase 4 lands the in-memory + disk implementations
|
||||
behind this Protocol.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Any, Protocol, runtime_checkable
|
||||
|
||||
|
||||
class RecorderHealth(str, Enum):
|
||||
"""Health tier of the flight-data recorder (FDR-04)."""
|
||||
|
||||
OK = "ok"
|
||||
DEGRADED = "degraded" # >= 90% storage
|
||||
CRITICAL = "critical" # storage limit reached
|
||||
|
||||
|
||||
@dataclass(slots=True, frozen=True)
|
||||
class FdrExportResult:
|
||||
"""Outcome of :meth:`FlightRecorder.export`."""
|
||||
|
||||
flight_id: str
|
||||
segment_count: int
|
||||
total_bytes: int
|
||||
path: str | None = None # set for DiskFlightRecorder, None for in-memory
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class FlightRecorder(Protocol):
|
||||
"""Append-only flight-data recorder per FDR-01. Filled in Phase 4."""
|
||||
|
||||
def append_event(self, event: dict[str, Any]) -> None: ...
|
||||
|
||||
def export(self) -> FdrExportResult: ...
|
||||
|
||||
@property
|
||||
def health(self) -> RecorderHealth: ...
|
||||
@@ -0,0 +1,44 @@
|
||||
"""Protocol surface for the safety_state component (Phase 3, SAFE-01..06).
|
||||
|
||||
Phase 1: stub only — the SafetyAnchorStateMachine becomes the
|
||||
authoritative source_label owner per SAFE-01 in Phase 3.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import Enum
|
||||
from typing import Protocol, runtime_checkable
|
||||
|
||||
from gps_denied.hot_types.position_estimate import PositionEstimate
|
||||
from gps_denied.hot_types.satellite_anchor import SatelliteAnchor
|
||||
|
||||
|
||||
class SourceLabel(str, Enum):
|
||||
"""Authoritative label for the provenance of a PositionEstimate (SAFE-01)."""
|
||||
|
||||
SATELLITE_ANCHORED = "satellite_anchored"
|
||||
VO_EXTRAPOLATED = "vo_extrapolated"
|
||||
DEAD_RECKONED = "dead_reckoned"
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class SafetyAnchorStateMachine(Protocol):
|
||||
"""Authoritative source_label owner per SAFE-01. Filled in Phase 3."""
|
||||
|
||||
@property
|
||||
def source_label(self) -> SourceLabel: ...
|
||||
|
||||
@property
|
||||
def anchor_age_ms(self) -> float: ...
|
||||
|
||||
@property
|
||||
def can_persist_tile(self) -> bool: ...
|
||||
|
||||
def on_anchor_accepted(self, anchor: SatelliteAnchor) -> None: ...
|
||||
|
||||
def on_anchor_rejected(self, reason: str) -> None: ...
|
||||
|
||||
def on_vo_update(self, timestamp: float) -> None: ...
|
||||
|
||||
def on_visual_blackout(self) -> None: ...
|
||||
|
||||
def annotate(self, estimate: PositionEstimate) -> PositionEstimate: ...
|
||||
Reference in New Issue
Block a user