Files
gps-denied-onboard/src/gps_denied_onboard/helpers/engine_filename_schema.py
T
Oleksandr Bezdieniezhnykh 3acc7f33dd [AZ-270] [AZ-272] [AZ-279] [AZ-281] [AZ-283] Compose root + FDR schema + 3 Layer-1 helpers
AZ-270: composition root with strategy registry, tier-gated lookup,
topo-order construction, all-or-nothing teardown, StrategyNotLinkedError
payload.
AZ-272: orjson-backed FdrRecord serialise/parse with forward-compat for
unknown payload + top-level fields and canonical overrun-record shape.
AZ-279: pyproj-backed WGS84/ECEF/ENU + OSM slippy-map tile math with
WgsConversionError for shape/range/zoom guards.
AZ-281: strict EngineFilenameSchema build/parse/matches_host with
anchored regex + enum validation; round-trip identity by construction.
AZ-283: dtype-preserving (fp16/fp32) single + batch L2 normaliser with
zero-norm safety and descriptor_metric() source-of-truth.
pyproject.toml pins pyproj>=3.6 and orjson>=3.9 (named-backend deps per
the AZ-272 / AZ-279 contracts). New DTOs LatLonAlt + BoundingBox and
EngineCacheKey + HostCapabilities land in _types/ to back the helper
contracts.
203 unit tests pass (64 new). Review verdict: PASS_WITH_WARNINGS;
findings are perf-NFR deferrals + dep amendment + minor docstring polish.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 02:03:36 +03:00

128 lines
4.8 KiB
Python

"""Self-describing `.engine` filename schema (AZ-281 / D-C10-7).
Public surface frozen by
``_docs/02_document/contracts/shared_helpers/engine_filename_schema.md`` v1.0.0.
Filename format: ``{model}__sm{SM}_jp{JP_dotted}_trt{TRT_dotted}_{precision}.engine``
where ``model`` is ``[a-z0-9_]`` (no ``__``), versions are dotted
``<major>.<minor>``, and ``precision`` is one of ``fp16``, ``int8``, ``mixed``.
"""
from __future__ import annotations
import re
from typing import Final
from gps_denied_onboard._types.manifests import EngineCacheKey, HostCapabilities
__all__ = [
"ALLOWED_PRECISIONS",
"ENGINE_SUFFIX",
"EngineFilenameSchema",
"EngineFilenameSchemaError",
]
ENGINE_SUFFIX: Final[str] = ".engine"
ALLOWED_PRECISIONS: Final[frozenset[str]] = frozenset({"fp16", "int8", "mixed"})
_MODEL_RE: Final[re.Pattern[str]] = re.compile(r"^[a-z0-9_]+$")
_DOTTED_VERSION_RE: Final[re.Pattern[str]] = re.compile(r"^\d+\.\d+$")
_FILENAME_RE: Final[re.Pattern[str]] = re.compile(
r"^(?P<model>[a-z0-9_]+)__sm(?P<sm>\d+)_jp(?P<jetpack>\d+\.\d+)_trt(?P<trt>\d+\.\d+)_"
r"(?P<precision>fp16|int8|mixed)\.engine$"
)
class EngineFilenameSchemaError(ValueError):
"""Raised by ``build`` / ``parse`` on validation / format violations (AZ-281)."""
class EngineFilenameSchema:
"""Stateless ``.engine`` filename builder / parser / host-match predicate."""
@staticmethod
def build(model_name: str, sm: int, jetpack: str, trt: str, precision: str) -> str:
_validate_model_name(model_name)
_validate_sm(sm)
_validate_version(jetpack, "jetpack")
_validate_version(trt, "trt")
_validate_precision(precision)
return f"{model_name}__sm{sm}_jp{jetpack}_trt{trt}_{precision}{ENGINE_SUFFIX}"
@staticmethod
def parse(filename: str) -> EngineCacheKey:
if not isinstance(filename, str):
raise EngineFilenameSchemaError(f"parse expects str; got {type(filename).__name__}")
if not filename.endswith(ENGINE_SUFFIX):
raise EngineFilenameSchemaError(
f"parse: filename must end with {ENGINE_SUFFIX!r}; got {filename!r}"
)
match = _FILENAME_RE.match(filename)
if not match:
raise EngineFilenameSchemaError(
f"parse: filename {filename!r} does not match the engine-schema format "
"'{model}__sm{SM}_jp{JP}_trt{TRT}_{precision}.engine'"
)
model = match.group("model")
if "__" in model:
raise EngineFilenameSchemaError(
f"parse: model segment {model!r} contains reserved separator '__'"
)
return EngineCacheKey(
model_name=model,
sm=int(match.group("sm")),
jetpack=match.group("jetpack"),
trt=match.group("trt"),
precision=match.group("precision"),
)
@staticmethod
def matches_host(filename: str, host_capabilities: HostCapabilities) -> bool:
key = EngineFilenameSchema.parse(filename)
return (
key.sm == host_capabilities.sm
and key.jetpack == host_capabilities.jetpack
and key.trt == host_capabilities.trt
)
def _validate_model_name(model_name: str) -> None:
if not isinstance(model_name, str):
raise EngineFilenameSchemaError(f"model_name must be str; got {type(model_name).__name__}")
if not model_name:
raise EngineFilenameSchemaError("model_name must be a non-empty string")
if "__" in model_name:
raise EngineFilenameSchemaError(
f"model_name {model_name!r} contains reserved separator '__'"
)
if not _MODEL_RE.match(model_name):
raise EngineFilenameSchemaError(
f"model_name {model_name!r} must match [a-z0-9_]+ (lowercase, digits, underscores)"
)
if len(model_name) > 64:
raise EngineFilenameSchemaError(f"model_name {model_name!r} exceeds 64-character limit")
def _validate_sm(sm: int) -> None:
if not isinstance(sm, int) or isinstance(sm, bool):
raise EngineFilenameSchemaError(f"sm must be a non-bool integer; got {sm!r}")
if sm <= 0:
raise EngineFilenameSchemaError(f"sm must be > 0; got {sm}")
def _validate_version(version: str, field_name: str) -> None:
if not isinstance(version, str):
raise EngineFilenameSchemaError(f"{field_name} must be str; got {type(version).__name__}")
if not _DOTTED_VERSION_RE.match(version):
raise EngineFilenameSchemaError(
f"{field_name} {version!r} must match dotted '<major>.<minor>' format"
)
def _validate_precision(precision: str) -> None:
if precision not in ALLOWED_PRECISIONS:
raise EngineFilenameSchemaError(
f"precision {precision!r} not in allowed enum "
f"{{{', '.join(sorted(ALLOWED_PRECISIONS))}}}"
)