[AZ-220] Add shared runtime contract models

Implement the shared DTO contract surface with validation so runtime components consume one public model set instead of duplicating shapes.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-03 13:22:50 +03:00
parent 72a9df6b57
commit 5156453224
8 changed files with 361 additions and 3 deletions
@@ -0,0 +1,34 @@
# Batch Report
**Batch**: 2
**Tasks**: AZ-220_shared_runtime_contracts
**Date**: 2026-05-03
## Task Results
| Task | Status | Files Modified | Tests | AC Coverage | Issues |
|------|--------|----------------|-------|-------------|--------|
| AZ-220_shared_runtime_contracts | Done | 8 files | Pass | 2/2 ACs covered | None |
## AC Test Coverage: All covered
| AC Ref | Coverage |
|--------|----------|
| AC-1 | `test_runtime_dtos_accept_valid_minimal_values` verifies the shared DTO contract surface can be imported and constructed. |
| AC-2 | `test_missing_required_timestamp_is_rejected_with_structured_error`, `test_raw_frame_retention_is_rejected`, `test_position_accuracy_cannot_under_report_covariance`, and `test_accepted_anchor_requires_estimated_pose` verify malformed DTOs are rejected with structured Pydantic validation errors. |
## Code Review Verdict: PASS
Review report: `_docs/03_implementation/reviews/batch_02_review.md`
## Auto-Fix Attempts: 0
## Stuck Agents: None
## Verification
- `.venv/bin/python -m black --check src tests e2e/replay` passed.
- `.venv/bin/python -m ruff check src tests e2e/replay` passed.
- `.venv/bin/python -m pytest` passed: 11 tests.
## Next Batch: AZ-221_shared_geometry_time_sync, AZ-222_runtime_config_errors_telemetry
@@ -0,0 +1,24 @@
# Code Review Report
**Batch**: AZ-220_shared_runtime_contracts
**Date**: 2026-05-03
**Verdict**: PASS
## Findings
| # | Severity | Category | File:Line | Title |
|---|----------|----------|-----------|-------|
| - | - | - | - | No findings |
## Review Notes
- AC-1 is satisfied by the public exports in `src/shared/contracts/__init__.py` and the DTO models in `src/shared/contracts/models.py`.
- AC-2 is satisfied by Pydantic validation for missing required fields, raw frame retention, optimistic covariance reporting, and inconsistent anchor decisions.
- The implementation stays inside `shared/contracts` ownership and does not introduce component-specific algorithms.
- Raw frame payloads remain references only; the model rejects retained raw-frame payload flags.
## Verification
- `.venv/bin/python -m black --check src tests e2e/replay` passed.
- `.venv/bin/python -m ruff check src tests e2e/replay` passed.
- `.venv/bin/python -m pytest` passed: 11 tests.
+1 -1
View File
@@ -9,6 +9,6 @@ tracker: jira
sub_step: sub_step:
phase: 1 phase: 1
name: batch-loop name: batch-loop
detail: "batch 1: AZ-219_initial_structure" detail: "batch 2: AZ-220_shared_runtime_contracts"
retry_count: 0 retry_count: 0
cycle: 1 cycle: 1
+3 -1
View File
@@ -7,7 +7,9 @@ name = "gps-denied-onboard"
version = "0.1.0" version = "0.1.0"
description = "Jetson-hosted GPS-denied localization runtime scaffold." description = "Jetson-hosted GPS-denied localization runtime scaffold."
requires-python = ">=3.10" requires-python = ">=3.10"
dependencies = [] dependencies = [
"pydantic==2.13.3",
]
[project.optional-dependencies] [project.optional-dependencies]
dev = [ dev = [
+26 -1
View File
@@ -1,3 +1,28 @@
"""Shared DTO and interface contract namespace.""" """Shared DTO and interface contract namespace."""
CONTRACT_VERSION = "0.1.0" from shared.contracts.models import (
AnchorDecision,
CacheTileRecord,
FdrEvent,
FramePacket,
PositionEstimate,
RuntimeContractModel,
TelemetrySample,
VioStatePacket,
VprCandidate,
)
CONTRACT_VERSION = "1.0.0"
__all__ = [
"AnchorDecision",
"CONTRACT_VERSION",
"CacheTileRecord",
"FdrEvent",
"FramePacket",
"PositionEstimate",
"RuntimeContractModel",
"TelemetrySample",
"VioStatePacket",
"VprCandidate",
]
+116
View File
@@ -0,0 +1,116 @@
"""Shared runtime DTO contracts.
These models intentionally carry only cross-component shape and validation rules.
Component algorithms and storage choices stay in their owning packages.
"""
from datetime import date
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field, NonNegativeFloat, NonNegativeInt, PositiveFloat
from pydantic import model_validator
class RuntimeContractModel(BaseModel):
"""Base settings for public runtime contracts."""
model_config = ConfigDict(extra="forbid", frozen=True)
class FramePacket(RuntimeContractModel):
frame_id: str = Field(min_length=1)
timestamp_ns: NonNegativeInt
image_ref: str = Field(min_length=1)
calibration_id: str = Field(min_length=1)
occlusion: Literal["clear", "partial", "total", "unreadable"]
quality: float = Field(ge=0.0, le=1.0)
normalization_hint: str | None = None
raw_frame_retained: bool = False
@model_validator(mode="after")
def raw_frames_must_not_be_retained(self) -> "FramePacket":
if self.raw_frame_retained:
raise ValueError("raw frame payloads must be referenced, not retained")
return self
class TelemetrySample(RuntimeContractModel):
timestamp_ns: NonNegativeInt
imu: dict[str, float]
attitude: dict[str, float]
altitude_m: float
airspeed_mps: NonNegativeFloat
gps_health: Literal["healthy", "degraded", "lost", "spoofed"]
class VioStatePacket(RuntimeContractModel):
timestamp_ns: NonNegativeInt
relative_pose: dict[str, float]
velocity_mps: tuple[float, float, float]
bias_estimate: dict[str, float] | None = None
tracking_quality: float = Field(ge=0.0, le=1.0)
covariance_hint: list[list[float]] | None = None
class PositionEstimate(RuntimeContractModel):
timestamp_ns: NonNegativeInt
latitude_deg: float = Field(ge=-90.0, le=90.0)
longitude_deg: float = Field(ge=-180.0, le=180.0)
altitude_m: float
covariance_semimajor_m: NonNegativeFloat
source_label: Literal["satellite_anchored", "vo_extrapolated", "dead_reckoned", "no_fix"]
fix_type: int = Field(ge=0, le=3)
horizontal_accuracy_m: NonNegativeFloat
anchor_age_ms: NonNegativeInt
@model_validator(mode="after")
def accuracy_must_not_under_report_covariance(self) -> "PositionEstimate":
if self.horizontal_accuracy_m < self.covariance_semimajor_m:
raise ValueError("horizontal_accuracy_m must not under-report covariance_semimajor_m")
return self
class VprCandidate(RuntimeContractModel):
chunk_id: str = Field(min_length=1)
tile_id: str = Field(min_length=1)
score: float = Field(ge=0.0, le=1.0)
footprint: dict[str, float]
freshness_status: Literal["fresh", "stale", "rejected"]
class AnchorDecision(RuntimeContractModel):
candidate_id: str = Field(min_length=1)
accepted: bool
estimated_pose: dict[str, float] | None = None
inliers: NonNegativeInt
mean_reprojection_error_px: NonNegativeFloat
rejection_reason: str | None = None
@model_validator(mode="after")
def accepted_anchors_require_pose(self) -> "AnchorDecision":
if self.accepted and self.estimated_pose is None:
raise ValueError("accepted anchor decisions require estimated_pose")
if self.accepted and self.rejection_reason is not None:
raise ValueError("accepted anchor decisions must not include rejection_reason")
return self
class CacheTileRecord(RuntimeContractModel):
tile_id: str = Field(min_length=1)
crs: str = Field(min_length=1)
meters_per_pixel: PositiveFloat
capture_date: date
signature_hash: str = Field(min_length=1)
trust_level: Literal["trusted", "generated", "quarantined", "rejected"]
freshness_status: Literal["fresh", "stale", "rejected"]
provenance: str = Field(min_length=1)
class FdrEvent(RuntimeContractModel):
event_type: str = Field(min_length=1)
timestamp_ns: NonNegativeInt
component: str = Field(min_length=1)
severity: Literal["debug", "info", "warning", "error", "critical"]
payload_ref: str = Field(min_length=1)
mission_id: str = Field(min_length=1)
run_id: str = Field(min_length=1)
+157
View File
@@ -0,0 +1,157 @@
import pytest
from pydantic import ValidationError
from shared.contracts import (
AnchorDecision,
CacheTileRecord,
FdrEvent,
FramePacket,
PositionEstimate,
TelemetrySample,
VioStatePacket,
VprCandidate,
)
def test_runtime_dtos_accept_valid_minimal_values() -> None:
# Arrange
timestamp_ns = 1_000_000
# Act
contracts = [
FramePacket(
frame_id="frame-1",
timestamp_ns=timestamp_ns,
image_ref="frames/frame-1",
calibration_id="calib-1",
occlusion="clear",
quality=0.95,
),
TelemetrySample(
timestamp_ns=timestamp_ns,
imu={"accel_x": 0.1},
attitude={"roll": 0.0},
altitude_m=950.0,
airspeed_mps=16.0,
gps_health="lost",
),
VioStatePacket(
timestamp_ns=timestamp_ns,
relative_pose={"x": 1.0},
velocity_mps=(1.0, 0.0, 0.0),
tracking_quality=0.8,
),
PositionEstimate(
timestamp_ns=timestamp_ns,
latitude_deg=49.9,
longitude_deg=36.2,
altitude_m=950.0,
covariance_semimajor_m=12.0,
source_label="satellite_anchored",
fix_type=3,
horizontal_accuracy_m=12.0,
anchor_age_ms=200,
),
VprCandidate(
chunk_id="chunk-1",
tile_id="tile-1",
score=0.87,
footprint={"min_lat": 49.0},
freshness_status="fresh",
),
AnchorDecision(
candidate_id="candidate-1",
accepted=True,
estimated_pose={"x": 1.0},
inliers=42,
mean_reprojection_error_px=0.8,
),
CacheTileRecord(
tile_id="tile-1",
crs="EPSG:3857",
meters_per_pixel=0.3,
capture_date="2026-05-03",
signature_hash="sha256:abc",
trust_level="trusted",
freshness_status="fresh",
provenance="suite-satellite-service",
),
FdrEvent(
event_type="health",
timestamp_ns=timestamp_ns,
component="shared.contracts",
severity="info",
payload_ref="fdr://segment/1",
mission_id="mission-1",
run_id="run-1",
),
]
# Assert
assert len(contracts) == 8
def test_missing_required_timestamp_is_rejected_with_structured_error() -> None:
# Act
with pytest.raises(ValidationError) as error:
FramePacket(
frame_id="frame-1",
image_ref="frames/frame-1",
calibration_id="calib-1",
occlusion="clear",
quality=0.95,
)
# Assert
assert error.value.errors()[0]["loc"] == ("timestamp_ns",)
assert error.value.errors()[0]["type"] == "missing"
def test_raw_frame_retention_is_rejected() -> None:
# Act
with pytest.raises(ValidationError) as error:
FramePacket(
frame_id="frame-1",
timestamp_ns=1,
image_ref="frames/frame-1",
calibration_id="calib-1",
occlusion="clear",
quality=0.95,
raw_frame_retained=True,
)
# Assert
assert "raw frame payloads must be referenced" in str(error.value)
def test_position_accuracy_cannot_under_report_covariance() -> None:
# Act
with pytest.raises(ValidationError) as error:
PositionEstimate(
timestamp_ns=1,
latitude_deg=49.9,
longitude_deg=36.2,
altitude_m=950.0,
covariance_semimajor_m=50.0,
source_label="satellite_anchored",
fix_type=3,
horizontal_accuracy_m=10.0,
anchor_age_ms=200,
)
# Assert
assert "must not under-report" in str(error.value)
def test_accepted_anchor_requires_estimated_pose() -> None:
# Act
with pytest.raises(ValidationError) as error:
AnchorDecision(
candidate_id="candidate-1",
accepted=True,
inliers=42,
mean_reprojection_error_px=0.8,
)
# Assert
assert "accepted anchor decisions require estimated_pose" in str(error.value)