[AZ-221] [AZ-222] Add shared runtime helpers

Provide deterministic geometry/time-sync helpers and structured config, error, health, and telemetry primitives for downstream runtime components.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-03 14:01:04 +03:00
parent 5156453224
commit c3650d979d
17 changed files with 495 additions and 1 deletions
+4
View File
@@ -1 +1,5 @@
"""Runtime configuration helper namespace."""
from shared.config.models import RuntimeProfile, readiness_error, validate_runtime_profile
__all__ = ["RuntimeProfile", "readiness_error", "validate_runtime_profile"]
+59
View File
@@ -0,0 +1,59 @@
"""Runtime profile configuration and readiness validation."""
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field, ValidationError, model_validator
from shared.errors import ErrorEnvelope, ResultEnvelope
class RuntimeProfile(BaseModel):
model_config = ConfigDict(extra="forbid", frozen=True)
environment: Literal["development", "ci", "staging", "jetson", "production"]
config_dir: str = Field(min_length=1)
cache_dir: str | None = None
fdr_dir: str | None = None
database_url: str | None = None
mavlink_url: str | None = None
camera_source: str | None = None
signing_key_ref: str | None = None
@model_validator(mode="after")
def production_requires_runtime_paths(self) -> "RuntimeProfile":
if self.environment != "production":
return self
missing = [
name
for name in (
"cache_dir",
"fdr_dir",
"database_url",
"mavlink_url",
"camera_source",
"signing_key_ref",
)
if getattr(self, name) in (None, "")
]
if missing:
raise ValueError(f"production profile missing required settings: {', '.join(missing)}")
return self
def readiness_error(component: str, message: str) -> ErrorEnvelope:
return ErrorEnvelope(
component=component,
category="configuration",
message=message,
severity="critical",
retryable=False,
)
def validate_runtime_profile(component: str, payload: dict[str, object]) -> ResultEnvelope:
try:
RuntimeProfile.model_validate(payload)
except ValidationError as error:
return ResultEnvelope.failure(readiness_error(component, str(error)))
return ResultEnvelope.success()
+4
View File
@@ -1 +1,5 @@
"""Shared error envelope namespace."""
from shared.errors.models import ErrorEnvelope, ResultEnvelope
__all__ = ["ErrorEnvelope", "ResultEnvelope"]
+38
View File
@@ -0,0 +1,38 @@
"""Shared structured error envelopes."""
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field
class ErrorEnvelope(BaseModel):
model_config = ConfigDict(extra="forbid", frozen=True)
component: str = Field(min_length=1)
category: Literal[
"configuration",
"dependency",
"validation",
"runtime",
"security",
"resource",
]
message: str = Field(min_length=1)
severity: Literal["info", "warning", "error", "critical"]
retryable: bool
cause: str | None = None
class ResultEnvelope(BaseModel):
model_config = ConfigDict(extra="forbid", frozen=True)
ok: bool
error: ErrorEnvelope | None = None
@classmethod
def success(cls) -> "ResultEnvelope":
return cls(ok=True)
@classmethod
def failure(cls, error: ErrorEnvelope) -> "ResultEnvelope":
return cls(ok=False, error=error)
+20
View File
@@ -1 +1,21 @@
"""Geospatial geometry helper namespace."""
from shared.geo_geometry.models import (
CameraFootprint,
LocalNedCoordinate,
Wgs84Coordinate,
distance_m,
local_to_wgs84,
nadir_camera_footprint,
wgs84_to_local,
)
__all__ = [
"CameraFootprint",
"LocalNedCoordinate",
"Wgs84Coordinate",
"distance_m",
"local_to_wgs84",
"nadir_camera_footprint",
"wgs84_to_local",
]
+83
View File
@@ -0,0 +1,83 @@
"""Deterministic geospatial helper models and calculations."""
from math import cos, radians, sqrt
from pydantic import BaseModel, ConfigDict, Field, PositiveFloat
EARTH_RADIUS_M = 6_378_137.0
class GeometryModel(BaseModel):
model_config = ConfigDict(extra="forbid", frozen=True)
class Wgs84Coordinate(GeometryModel):
latitude_deg: float = Field(ge=-90.0, le=90.0)
longitude_deg: float = Field(ge=-180.0, le=180.0)
altitude_m: float = 0.0
class LocalNedCoordinate(GeometryModel):
north_m: float
east_m: float
down_m: float = 0.0
class CameraFootprint(GeometryModel):
center: Wgs84Coordinate
ground_width_m: PositiveFloat
ground_height_m: PositiveFloat
ground_sample_distance_m_per_px: PositiveFloat
def wgs84_to_local(origin: Wgs84Coordinate, point: Wgs84Coordinate) -> LocalNedCoordinate:
lat_delta_rad = radians(point.latitude_deg - origin.latitude_deg)
lon_delta_rad = radians(point.longitude_deg - origin.longitude_deg)
latitude_scale = cos(radians(origin.latitude_deg))
return LocalNedCoordinate(
north_m=lat_delta_rad * EARTH_RADIUS_M,
east_m=lon_delta_rad * EARTH_RADIUS_M * latitude_scale,
down_m=origin.altitude_m - point.altitude_m,
)
def local_to_wgs84(origin: Wgs84Coordinate, local: LocalNedCoordinate) -> Wgs84Coordinate:
latitude_deg = origin.latitude_deg + (local.north_m / EARTH_RADIUS_M) * (
180.0 / 3.141592653589793
)
latitude_scale = cos(radians(origin.latitude_deg))
longitude_deg = origin.longitude_deg + (local.east_m / (EARTH_RADIUS_M * latitude_scale)) * (
180.0 / 3.141592653589793
)
return Wgs84Coordinate(
latitude_deg=latitude_deg,
longitude_deg=longitude_deg,
altitude_m=origin.altitude_m - local.down_m,
)
def distance_m(first: Wgs84Coordinate, second: Wgs84Coordinate) -> float:
local = wgs84_to_local(first, second)
return sqrt(local.north_m**2 + local.east_m**2 + local.down_m**2)
def nadir_camera_footprint(
center: Wgs84Coordinate,
altitude_agl_m: PositiveFloat,
sensor_width_px: int,
sensor_height_px: int,
ground_sample_distance_m_per_px: PositiveFloat,
) -> CameraFootprint:
if sensor_width_px <= 0 or sensor_height_px <= 0:
raise ValueError("sensor dimensions must be positive")
if altitude_agl_m <= 0:
raise ValueError("altitude_agl_m must be positive")
return CameraFootprint(
center=center,
ground_width_m=sensor_width_px * ground_sample_distance_m_per_px,
ground_height_m=sensor_height_px * ground_sample_distance_m_per_px,
ground_sample_distance_m_per_px=ground_sample_distance_m_per_px,
)
+4
View File
@@ -1 +1,5 @@
"""Structured telemetry and health metadata namespace."""
from shared.telemetry.models import HealthEvent, MetricsLabels
__all__ = ["HealthEvent", "MetricsLabels"]
+23
View File
@@ -0,0 +1,23 @@
"""FDR-safe health and metrics metadata."""
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field, NonNegativeInt
class HealthEvent(BaseModel):
model_config = ConfigDict(extra="forbid", frozen=True)
component: str = Field(min_length=1)
timestamp_ns: NonNegativeInt
liveness: Literal["alive", "failed"]
readiness: Literal["ready", "not_ready"]
dependency_state: dict[str, str] = Field(default_factory=dict)
class MetricsLabels(BaseModel):
model_config = ConfigDict(extra="forbid", frozen=True)
component: str = Field(min_length=1)
action: str = Field(min_length=1)
status: Literal["ok", "degraded", "failed"]
+14
View File
@@ -1 +1,15 @@
"""Clock-domain and timestamp helper namespace."""
from shared.time_sync.models import (
TimeSyncViolation,
TimeWindowResult,
check_monotonic_timestamps,
select_time_window,
)
__all__ = [
"TimeSyncViolation",
"TimeWindowResult",
"check_monotonic_timestamps",
"select_time_window",
]
+77
View File
@@ -0,0 +1,77 @@
"""Timestamp alignment helpers."""
from pydantic import BaseModel, ConfigDict, NonNegativeInt
class TimeSyncModel(BaseModel):
model_config = ConfigDict(extra="forbid", frozen=True)
class TimeSyncViolation(TimeSyncModel):
category: str
message: str
class TimeWindowResult(TimeSyncModel):
frame_timestamp_ns: NonNegativeInt
sample_timestamps_ns: tuple[NonNegativeInt, ...]
max_gap_ns: NonNegativeInt
jitter_ns: NonNegativeInt
violations: tuple[TimeSyncViolation, ...] = ()
@property
def ok(self) -> bool:
return not self.violations
def check_monotonic_timestamps(timestamps_ns: list[int]) -> tuple[TimeSyncViolation, ...]:
violations: list[TimeSyncViolation] = []
for previous, current in zip(timestamps_ns, timestamps_ns[1:]):
if current <= previous:
violations.append(
TimeSyncViolation(
category="timestamp_mismatch",
message="timestamps must be strictly increasing",
)
)
break
return tuple(violations)
def select_time_window(
frame_timestamp_ns: int,
sample_timestamps_ns: list[int],
tolerance_ns: int,
) -> TimeWindowResult:
if tolerance_ns < 0:
raise ValueError("tolerance_ns must be non-negative")
violations = list(check_monotonic_timestamps(sample_timestamps_ns))
selected = tuple(
timestamp
for timestamp in sample_timestamps_ns
if abs(timestamp - frame_timestamp_ns) <= tolerance_ns
)
if not selected:
violations.append(
TimeSyncViolation(
category="gap_exceeded",
message="no telemetry samples fall within the frame tolerance window",
)
)
gaps = [
current - previous
for previous, current in zip(sample_timestamps_ns, sample_timestamps_ns[1:])
if current > previous
]
max_gap_ns = max(gaps, default=0)
jitter_ns = max(gaps, default=0) - min(gaps, default=0) if gaps else 0
return TimeWindowResult(
frame_timestamp_ns=frame_timestamp_ns,
sample_timestamps_ns=selected,
max_gap_ns=max_gap_ns,
jitter_ns=jitter_ns,
violations=tuple(violations),
)