[AZ-263] Bootstrap: repo skeleton + Docker + CI + Alembic + Tier-1 tests

Implements the AZ-263 / E-BOOT initial structure task:

- Python src/-layout package `gps_denied_onboard/` with per-component
  interface stubs (14 components), type-only DTOs under `_types/`,
  shared helpers under `helpers/` (R14 LightGlue ownership), structured
  JSON logging, runtime composition root with env-var fail-fast gate,
  healthcheck module shared by Docker and CI smoke.
- CMake top-level + `cmake/{build_options,dependencies,strategies}.cmake`
  with the BUILD_* per-binary flags (ADR-002) and pinned external git
  refs for OKVIS2 / VINS-Mono / GTSAM / FAISS / OpenCV >=4.12.0.
- Three Dockerfiles (companion-tier1, operator-tooling,
  mock-suite-sat-service) + two compose files (dev + Tier-1 test).
- Four GitHub Actions workflows: ci.yml (lint/unit/integration/dual
  binary build/SBOM diff/security), ci-tier2.yml (self-hosted Jetson
  AC-bound NFTs), release.yml, cve-rescan.yml.
- Two CI gate scripts: `ci/sbom_diff.py` (deployment SBOM subset +
  R02 exclusion), `ci/opencv_pin_gate.py` (>=4.12.0 enforcement,
  D-CROSS-CVE-1).
- Alembic-driven Postgres 16 initial migration `0001_initial.py`
  mirroring satellite-provider tiles + flights + sector_classifications
  + manifests + engine_cache_entries (data_model.md s 2).
- Tier-1 test scaffolding: 95 passing unit tests covering every AC,
  per-component smoke tests, structured logging JSON output check,
  env-var gate check, healthcheck import check. Two CI-gated tests
  (cmake configure, actionlint) skip locally with explicit reasons.
- Batch report + code review report under `_docs/03_implementation/`.

Verdict: PASS_WITH_WARNINGS (two Low findings, both informational).
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-11 01:00:28 +03:00
parent 880eabcb3f
commit b12db61444
168 changed files with 3688 additions and 3 deletions
+3
View File
@@ -0,0 +1,3 @@
"""gps_denied_onboard — companion onboard system for GPS-denied UAV navigation."""
__version__ = "0.1.0"
@@ -0,0 +1 @@
"""Cross-component DTOs (type-only stubs)."""
@@ -0,0 +1,22 @@
"""Camera calibration DTO."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(frozen=True)
class CameraCalibration:
"""Camera intrinsics + distortion + body-to-camera + provenance.
Acquisition method is preserved so downstream estimators can tag estimates with
the calibration provenance per D-CROSS-CAL-1.
"""
camera_id: str
intrinsics_3x3: Any
distortion: Any
body_to_camera_se3: Any
acquisition_method: str
metadata: dict[str, Any] = field(default_factory=dict)
+19
View File
@@ -0,0 +1,19 @@
"""C8 outbound (FC-emitted) external-position DTO."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
@dataclass(frozen=True)
class EmittedExternalPosition:
"""A single C8-emitted external-position datum (encoded per-FC at the adapter)."""
timestamp: datetime
latitude: float
longitude: float
altitude: float
horizontal_accuracy_m: float
vertical_accuracy_m: float
source_label: str
@@ -0,0 +1,32 @@
"""C10 manifest + engine-cache DTOs."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass(frozen=True)
class Manifest:
"""C10 cache-provisioning manifest (D-C10-1 idempotence hash)."""
manifest_id: str
flight_id: str
created_at: datetime
content_hash: str
entries: tuple[Any, ...] = ()
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class EngineCacheEntry:
"""TensorRT engine + calibration cache, keyed by SM/JP/TRT/precision (D-C10-7)."""
engine_path: str
sm_arch: str
jetpack_version: str
tensorrt_version: str
precision: str
content_hash: str
int8_calibration_path: str | None = None
+18
View File
@@ -0,0 +1,18 @@
"""C3 cross-domain matching DTO."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True)
class MatchResult:
"""Output of the cross-domain matcher (frame ↔ satellite tile)."""
query_frame_id: int
tile_id: str
keypoints_query: Any
keypoints_tile: Any
matches: Any
inlier_mask: Any | None = None
+69
View File
@@ -0,0 +1,69 @@
"""Navigation-side DTOs: camera frames, IMU samples, attitude, FC flight state, GPS health.
These are type-only stubs created by AZ-263 (Bootstrap). Concrete field semantics are
defined in `_docs/02_document/architecture.md § 4` and the C1 / C5 / C8 component
specs. Concrete subclasses are owned by the components that emit them; downstream
consumers depend on the DTOs declared here.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass(frozen=True)
class NavCameraFrame:
"""A single nav-camera frame routed into the pipeline."""
frame_id: int
timestamp: datetime
image: Any
camera_calibration_id: str
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class ImuSample:
"""A single IMU sample (accel + gyro + timestamp)."""
timestamp: datetime
accel_xyz: tuple[float, float, float]
gyro_xyz: tuple[float, float, float]
@dataclass(frozen=True)
class ImuWindow:
"""A short window of IMU samples for preintegration."""
samples: tuple[ImuSample, ...]
t_start: datetime
t_end: datetime
@dataclass(frozen=True)
class AttitudeWindow:
"""Attitude samples over a time window (quaternion + timestamp)."""
quaternions: tuple[tuple[float, float, float, float], ...]
timestamps: tuple[datetime, ...]
@dataclass(frozen=True)
class FlightStateSignal:
"""Flight-controller-reported high-level state (armed, taking off, in flight, landed, …)."""
state: str
timestamp: datetime
@dataclass(frozen=True)
class GpsHealth:
"""FC-reported GPS health bundle (sats, hdop, fix type, spoofing-flag, …)."""
fix_type: int
satellites_visible: int
hdop: float
timestamp: datetime
spoofing_flag: bool = False
+42
View File
@@ -0,0 +1,42 @@
"""C4 PoseEstimator + C5 StateEstimator output DTOs."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass(frozen=True)
class PoseEstimate:
"""A single 6-DoF pose estimate with covariance."""
frame_id: int
timestamp: datetime
pose_se3: Any
covariance_6x6: Any | None = None
covariance_mode: str = "marginals"
mre_px: float | None = None
@dataclass(frozen=True)
class EstimatorOutput:
"""C5 state-estimator output (smoothed pose + uncertainty + source label + health)."""
frame_id: int
timestamp: datetime
pose_se3: Any
covariance_6x6: Any | None = None
source_label: str = "visual_propagated"
health: EstimatorHealth | None = None
extras: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class EstimatorHealth:
"""C5 estimator health flags."""
last_anchor_age_ms: int = 0
imu_bias_norm: float = 0.0
vio_drift_proxy: float = 0.0
is_spoof_promoted: bool = False
+59
View File
@@ -0,0 +1,59 @@
"""C6 tile-cache DTOs."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass(frozen=True)
class Tile:
"""A single satellite tile (image body + metadata)."""
tile_id: str
zoom_level: int
latitude: float
longitude: float
tile_size_meters: float
tile_size_pixels: int
image_path: str
@dataclass(frozen=True)
class TileQualityMetadata:
"""Quality metadata attached to an onboard-ingested tile (D-PROJ-2 ingest contract)."""
estimator_label: str
covariance_2x2: tuple[tuple[float, float], tuple[float, float]]
last_anchor_age_ms: int
mre_px: float
imu_bias_norm: float
@dataclass(frozen=True)
class TileRecord:
"""Postgres row for a tile (mirrors satellite-provider's canonical columns + additive)."""
tile_id: str
zoom_level: int
latitude: float
longitude: float
tile_size_meters: float
tile_size_pixels: int
source: str
voting_status: str = "trusted"
flight_id: str | None = None
companion_id: str | None = None
capture_timestamp: datetime | None = None
quality: TileQualityMetadata | None = None
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class SectorClassification:
"""Operator-set classification of a geographic sector (urban / forest / agriculture / …)."""
sector_id: str
classification: str
freshness_threshold_days: int
+21
View File
@@ -0,0 +1,21 @@
"""C1 VIO output DTO."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass(frozen=True)
class VioOutput:
"""VIO pose + uncertainty + health bundle.
Concrete semantics in `_docs/02_document/components/01_c1_vio/description.md § 2`.
"""
frame_id: int
timestamp: datetime
pose_se3: Any
covariance_6x6: Any | None = None
health_flags: dict[str, Any] = field(default_factory=dict)
+35
View File
@@ -0,0 +1,35 @@
"""C2 VPR + C2.5 rerank DTOs."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass(frozen=True)
class VprQuery:
"""A VPR query (global descriptor + frame metadata)."""
frame_id: int
timestamp: datetime
global_descriptor: Any
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class VprResult:
"""Top-K candidates from C2 retrieval."""
query_frame_id: int
candidate_tile_ids: tuple[str, ...]
scores: tuple[float, ...]
@dataclass(frozen=True)
class RerankResult:
"""C2.5 reranked set of candidate tiles."""
query_frame_id: int
candidate_tile_ids: tuple[str, ...]
inlier_counts: tuple[int, ...]
+1
View File
@@ -0,0 +1 @@
"""CLI entrypoints. `gps-denied-replay` lives in `replay.py` (AZ-402)."""
+19
View File
@@ -0,0 +1,19 @@
"""`gps-denied-replay` CLI entrypoint — STUB.
Owned by AZ-402. Bootstrap exposes a callable so `[project.scripts]` in
pyproject.toml resolves.
"""
from __future__ import annotations
import sys
def main(argv: list[str] | None = None) -> int:
"""Replay-CLI entrypoint."""
print("gps-denied-replay is not yet implemented (AZ-402 / E-DEMO-REPLAY)", file=sys.stderr)
return 2
if __name__ == "__main__":
raise SystemExit(main())
+9
View File
@@ -0,0 +1,9 @@
"""Clock interface + concrete implementations.
The interface is bootstrap-stubbed here. `WallClock` (live) and `TlogDerivedClock`
(replay) are owned by AZ-401 (E-DEMO-REPLAY).
"""
from gps_denied_onboard.clock.interface import Clock
__all__ = ["Clock"]
+20
View File
@@ -0,0 +1,20 @@
"""`Clock` Protocol.
R-DEMO-4: production C1-C5 paths bake real-time-cadence assumptions; injected
Clock lets replay mode trip those timers consistently against tlog timestamps.
Owned by AZ-401. Bootstrap ships the interface stub.
"""
from __future__ import annotations
from datetime import datetime
from typing import Protocol
class Clock(Protocol):
"""A monotonic clock abstraction."""
def now(self) -> datetime: ...
def monotonic(self) -> float: ...
@@ -0,0 +1 @@
"""Component subpackage — one folder per interface-first component (ADR-009)."""
@@ -0,0 +1,6 @@
"""C10 Cache Provisioning component — Public API."""
from gps_denied_onboard._types.manifests import EngineCacheEntry, Manifest
from gps_denied_onboard.components.c10_provisioning.interface import CacheProvisioner
__all__ = ["CacheProvisioner", "EngineCacheEntry", "Manifest"]
@@ -0,0 +1,18 @@
"""C10 `CacheProvisioner` Protocol.
Concrete impl: engine compile + descriptors + manifest + content-hash gate. See
`_docs/02_document/components/11_c10_provisioning/`.
"""
from __future__ import annotations
from pathlib import Path
from typing import Protocol
from gps_denied_onboard._types.manifests import Manifest
class CacheProvisioner(Protocol):
"""Pre-flight cache provisioning (engine compile + descriptor batch + manifest)."""
def provision(self, flight_id: str, output_root: Path) -> Manifest: ...
@@ -0,0 +1,8 @@
"""C11 Tile Manager component — Public API."""
from gps_denied_onboard.components.c11_tile_manager.interface import (
TileDownloader,
TileUploader,
)
__all__ = ["TileDownloader", "TileUploader"]
@@ -0,0 +1,27 @@
"""C11 `TileDownloader` + `TileUploader` Protocols.
Operator-side ONLY — excluded from airborne via CMake (`BUILD_C11_TILE_MANAGER=OFF`).
See `_docs/02_document/components/12_c11_tilemanager/`.
"""
from __future__ import annotations
from collections.abc import Iterable
from pathlib import Path
from typing import Protocol
from gps_denied_onboard._types.tile import TileRecord
class TileDownloader(Protocol):
"""Pre-flight tile download from `satellite-provider`."""
def download(
self, lat_lon_box: tuple[float, float, float, float], zoom: int, output_root: Path
) -> Iterable[TileRecord]: ...
class TileUploader(Protocol):
"""Post-landing batch upload to the `satellite-provider` ingest endpoint (D-PROJ-2)."""
def upload(self, tiles: Iterable[TileRecord], flight_id: str) -> None: ...
@@ -0,0 +1,8 @@
"""C12 Operator Pre-flight Tooling component — Public API."""
from gps_denied_onboard.components.c12_operator_tooling.interface import (
CacheBuildWorkflow,
OperatorReLocService,
)
__all__ = ["CacheBuildWorkflow", "OperatorReLocService"]
@@ -0,0 +1,21 @@
"""C12 `CacheBuildWorkflow` + `OperatorReLocService` Protocols.
See `_docs/02_document/components/13_c12_operator_tooling/`.
"""
from __future__ import annotations
from pathlib import Path
from typing import Protocol
class CacheBuildWorkflow(Protocol):
"""Operator CLI workflow that orchestrates C11 download → C10 provisioning."""
def run(self, flight_id: str, output_root: Path) -> None: ...
class OperatorReLocService(Protocol):
"""Operator-side re-localization request service (GUI deferred per epic)."""
def request_relocalization(self, flight_id: str, hint: dict) -> None: ...
@@ -0,0 +1,5 @@
"""C13 FDR Writer component — Public API."""
from gps_denied_onboard.components.c13_fdr.interface import FdrWriter
__all__ = ["FdrWriter"]
@@ -0,0 +1,21 @@
"""C13 `FdrWriter` Protocol (consumer side).
Producer-side `FdrClient` lives in `gps_denied_onboard.fdr_client` (cross-cutting,
AZ-247); this consumer-side writer is owned by AZ-248 / E-C13.
"""
from __future__ import annotations
from typing import Protocol
from gps_denied_onboard.fdr_client.records import FdrRecord
class FdrWriter(Protocol):
"""FDR consumer: writer thread + segment rotation + ≤64 GB capacity cap."""
def start(self) -> None: ...
def stop(self) -> None: ...
def consume(self, record: FdrRecord) -> None: ...
@@ -0,0 +1,6 @@
"""C1 VIO component — Public API."""
from gps_denied_onboard._types.vio import VioOutput
from gps_denied_onboard.components.c1_vio.interface import VioStrategy
__all__ = ["VioOutput", "VioStrategy"]
@@ -0,0 +1,4 @@
"""pybind11 wrappers for `cpp/okvis2/`, `cpp/vins_mono/`, `cpp/klt_ransac/`.
Placeholders shipped by AZ-263; real wrappers land with the concrete strategies.
"""
@@ -0,0 +1,20 @@
"""C1 `VioStrategy` Protocol.
Concrete strategies: OKVIS2 (default), VINS-Mono (research-only), KLT/RANSAC
(mandatory simple baseline). See `_docs/02_document/components/01_c1_vio/`.
"""
from __future__ import annotations
from typing import Protocol
from gps_denied_onboard._types.nav import ImuWindow, NavCameraFrame
from gps_denied_onboard._types.vio import VioOutput
class VioStrategy(Protocol):
"""Visual-Inertial-Odometry strategy."""
def step(self, frame: NavCameraFrame, imu: ImuWindow) -> VioOutput:
"""Process a single nav-camera frame + IMU window and return a VIO update."""
...
@@ -0,0 +1,6 @@
"""C2.5 Rerank component — Public API."""
from gps_denied_onboard._types.vpr import RerankResult
from gps_denied_onboard.components.c2_5_rerank.interface import RerankStrategy
__all__ = ["RerankResult", "RerankStrategy"]
@@ -0,0 +1,17 @@
"""C2.5 `RerankStrategy` Protocol.
Default: `InlierBasedReranker` (single-pair LightGlue inlier counter, K=10 → N=3).
See `_docs/02_document/components/03_c2_5_rerank/`.
"""
from __future__ import annotations
from typing import Protocol
from gps_denied_onboard._types.vpr import RerankResult, VprResult
class RerankStrategy(Protocol):
"""Re-rank C2's top-K candidates down to N via cross-domain match scoring."""
def rerank(self, vpr_result: VprResult, n_keep: int = 3) -> RerankResult: ...
@@ -0,0 +1,6 @@
"""C2 VPR component — Public API."""
from gps_denied_onboard._types.vpr import VprQuery, VprResult
from gps_denied_onboard.components.c2_vpr.interface import VprStrategy
__all__ = ["VprQuery", "VprResult", "VprStrategy"]
@@ -0,0 +1 @@
"""Native bindings for VPR runtime — placeholder."""
@@ -0,0 +1,17 @@
"""C2 `VprStrategy` Protocol.
Concrete strategies: UltraVPR (primary), MegaLoc, MixVPR, SelaVPR, EigenPlaces,
NetVLAD, SALAD. See `_docs/02_document/components/02_c2_vpr/`.
"""
from __future__ import annotations
from typing import Protocol
from gps_denied_onboard._types.vpr import VprQuery, VprResult
class VprStrategy(Protocol):
"""Visual Place Recognition strategy: encode → retrieve top-K candidates."""
def retrieve(self, query: VprQuery, top_k: int = 10) -> VprResult: ...
@@ -0,0 +1,5 @@
"""C3.5 AdHoP Refinement component — Public API."""
from gps_denied_onboard.components.c3_5_adhop.interface import AdHoPRefinementStrategy
__all__ = ["AdHoPRefinementStrategy"]
@@ -0,0 +1,16 @@
"""C3.5 `AdHoPRefinementStrategy` Protocol.
Concrete impl: AdHoP refiner. See `_docs/02_document/components/05_c3_5_adhop/`.
"""
from __future__ import annotations
from typing import Protocol
from gps_denied_onboard._types.matching import MatchResult
class AdHoPRefinementStrategy(Protocol):
"""Conditional refinement of a `MatchResult` (geometric verification + outlier purge)."""
def refine(self, match: MatchResult) -> MatchResult: ...
@@ -0,0 +1,6 @@
"""C3 Cross-Domain Matcher component — Public API."""
from gps_denied_onboard._types.matching import MatchResult
from gps_denied_onboard.components.c3_matcher.interface import CrossDomainMatcher
__all__ = ["CrossDomainMatcher", "MatchResult"]
@@ -0,0 +1 @@
"""Native bindings for the cross-domain matcher runtime — placeholder."""
@@ -0,0 +1,19 @@
"""C3 `CrossDomainMatcher` Protocol.
Concrete impls: DISK+LightGlue (primary), ALIKED+LightGlue, XFeat. See
`_docs/02_document/components/04_c3_matcher/`.
"""
from __future__ import annotations
from typing import Protocol
from gps_denied_onboard._types.matching import MatchResult
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard._types.tile import Tile
class CrossDomainMatcher(Protocol):
"""Match a nav-camera frame against a satellite tile."""
def match(self, frame: NavCameraFrame, tile: Tile) -> MatchResult: ...
@@ -0,0 +1,6 @@
"""C4 Pose Estimator component — Public API."""
from gps_denied_onboard._types.pose import EstimatorOutput, PoseEstimate
from gps_denied_onboard.components.c4_pose.interface import PoseEstimator
__all__ = ["EstimatorOutput", "PoseEstimate", "PoseEstimator"]
@@ -0,0 +1 @@
"""pybind11 wrapper for `cpp/gtsam_bindings/` (READ-ONLY consumer; primary owner is c5_state)."""
@@ -0,0 +1,18 @@
"""C4 `PoseEstimator` Protocol.
Concrete impl: OpenCV `solvePnPRansac` + GTSAM Marginals. See
`_docs/02_document/components/06_c4_pose/`.
"""
from __future__ import annotations
from typing import Protocol
from gps_denied_onboard._types.matching import MatchResult
from gps_denied_onboard._types.pose import PoseEstimate
class PoseEstimator(Protocol):
"""Estimate a 6-DoF pose from a verified cross-domain match."""
def estimate(self, match: MatchResult) -> PoseEstimate: ...
@@ -0,0 +1,6 @@
"""C5 State Estimator component — Public API."""
from gps_denied_onboard._types.pose import EstimatorHealth, EstimatorOutput
from gps_denied_onboard.components.c5_state.interface import StateEstimator
__all__ = ["EstimatorHealth", "EstimatorOutput", "StateEstimator"]
@@ -0,0 +1 @@
"""pybind11 wrapper for `cpp/gtsam_bindings/` (primary owner; also used READ-ONLY by c4_pose)."""
@@ -0,0 +1,23 @@
"""C5 `StateEstimator` Protocol.
Concrete impls: `GtsamIsam2StateEstimator` (production-default; iSAM2 +
IncrementalFixedLagSmoother), `EskfStateEstimator` (mandatory simple baseline).
See `_docs/02_document/components/07_c5_state/`.
"""
from __future__ import annotations
from typing import Protocol
from gps_denied_onboard._types.pose import EstimatorOutput, PoseEstimate
from gps_denied_onboard._types.vio import VioOutput
class StateEstimator(Protocol):
"""Smoothed state estimator (fuses VIO + satellite anchors + IMU)."""
def add_vio(self, vio: VioOutput) -> None: ...
def add_pose_anchor(self, anchor: PoseEstimate) -> None: ...
def latest_output(self) -> EstimatorOutput | None: ...
@@ -0,0 +1,21 @@
"""C6 Tile Cache & Vector Index component — Public API."""
from gps_denied_onboard._types.tile import (
SectorClassification,
Tile,
TileQualityMetadata,
TileRecord,
)
from gps_denied_onboard.components.c6_tile_cache.interface import (
DescriptorIndex,
TileStore,
)
__all__ = [
"DescriptorIndex",
"SectorClassification",
"Tile",
"TileQualityMetadata",
"TileRecord",
"TileStore",
]
@@ -0,0 +1 @@
"""pybind11 wrapper for `cpp/faiss_index/` — placeholder."""
@@ -0,0 +1,32 @@
"""C6 `TileStore` + `DescriptorIndex` Protocols.
Concrete impl: `PostgresFilesystemStore` (Postgres mirror + filesystem mmap +
FAISS HNSW). See `_docs/02_document/components/08_c6_tile_cache/`.
"""
from __future__ import annotations
from collections.abc import Iterable
from typing import Protocol
from gps_denied_onboard._types.tile import Tile, TileRecord
class TileStore(Protocol):
"""Tile metadata + body store (mirrors satellite-provider; cached locally)."""
def get(self, tile_id: str) -> Tile | None: ...
def query_by_lat_lon(
self, lat: float, lon: float, zoom: int, radius_m: float
) -> Iterable[TileRecord]: ...
def put(self, record: TileRecord) -> None: ...
class DescriptorIndex(Protocol):
"""Vector index over tile descriptors (FAISS HNSW concrete impl)."""
def add(self, tile_id: str, descriptor) -> None: ...
def search(self, descriptor, top_k: int) -> Iterable[tuple[str, float]]: ...
@@ -0,0 +1,6 @@
"""C7 Inference Runtime component — Public API."""
from gps_denied_onboard._types.manifests import EngineCacheEntry
from gps_denied_onboard.components.c7_inference.interface import InferenceRuntime
__all__ = ["EngineCacheEntry", "InferenceRuntime"]
@@ -0,0 +1,18 @@
"""C7 `InferenceRuntime` Protocol.
Concrete impls: `TensorrtRuntime` (production-default; TensorRT 10.3),
`OnnxTrtEpRuntime` (ONNX Runtime + TensorRT EP), `PytorchFp16Runtime` (research
baseline). See `_docs/02_document/components/09_c7_inference/`.
"""
from __future__ import annotations
from typing import Any, Protocol
class InferenceRuntime(Protocol):
"""Compiled-engine inference runtime."""
def infer(self, inputs: Any) -> Any: ...
def load(self, engine_path: str) -> None: ...
@@ -0,0 +1,10 @@
"""C8 FC + GCS Adapter component — Public API."""
from gps_denied_onboard._types.emitted import EmittedExternalPosition
from gps_denied_onboard.components.c8_fc_adapter.interface import (
FcAdapter,
GcsAdapter,
ReplaySink,
)
__all__ = ["EmittedExternalPosition", "FcAdapter", "GcsAdapter", "ReplaySink"]
@@ -0,0 +1,47 @@
"""C8 Adapter Protocols: `FcAdapter`, `GcsAdapter`, `ReplaySink`.
Concrete impls: `PymavlinkArdupilotAdapter`, `Msp2InavAdapter`,
`MavlinkGcsAdapter`, `TlogReplayFcAdapter`, `JsonlReplaySink`. See
`_docs/02_document/components/10_c8_fc_adapter/`.
"""
from __future__ import annotations
from collections.abc import Iterator
from typing import Protocol
from gps_denied_onboard._types.emitted import EmittedExternalPosition
from gps_denied_onboard._types.nav import (
AttitudeWindow,
FlightStateSignal,
GpsHealth,
ImuSample,
)
class FcAdapter(Protocol):
"""Bidirectional flight-controller adapter."""
def outbound(self, position: EmittedExternalPosition) -> None: ...
def inbound_imu(self) -> Iterator[ImuSample]: ...
def inbound_attitude(self) -> Iterator[AttitudeWindow]: ...
def inbound_gps_health(self) -> Iterator[GpsHealth]: ...
def inbound_flight_state(self) -> Iterator[FlightStateSignal]: ...
class GcsAdapter(Protocol):
"""Ground-control-station adapter (telemetry + operator commands)."""
def emit_summary(self, summary: dict) -> None: ...
def operator_commands(self) -> Iterator[dict]: ...
class ReplaySink(Protocol):
"""Replay-mode estimate sink (e.g. JSONL writer)."""
def write(self, estimate: dict) -> None: ...
@@ -0,0 +1,9 @@
"""Config loader + dataclass schemas (owned by AZ-269 / E-CC-CONF).
Bootstrap creates importable stubs so every component constructor can take a
config argument from day one.
"""
from gps_denied_onboard.config.schema import RuntimeConfig
__all__ = ["RuntimeConfig"]
+16
View File
@@ -0,0 +1,16 @@
"""Config loader — STUB.
Concrete YAML + env-var loader is owned by AZ-269. Bootstrap exposes the load
function as `NotImplementedError` so callers fail loudly until AZ-269 lands.
"""
from __future__ import annotations
from pathlib import Path
from gps_denied_onboard.config.schema import RuntimeConfig
def load_runtime_config(yaml_path: Path) -> RuntimeConfig:
"""Load a `RuntimeConfig` from a YAML file + environment overlay."""
raise NotImplementedError("Config loader concrete impl is AZ-269 (E-CC-CONF)")
+26
View File
@@ -0,0 +1,26 @@
"""Config dataclass schemas — STUB.
Concrete YAML schema is owned by AZ-269. Bootstrap declares only the runtime-level
config container so the composition root can type its `compose_*` signatures.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(frozen=True)
class RuntimeConfig:
"""Runtime configuration loaded from YAML + env vars.
The concrete field set is filled in by AZ-269. This stub is enough for the
composition root + tests to import the type.
"""
fc_profile: str = "ardupilot_plane"
tier: int = 1
db_url: str = ""
log_level: str = "INFO"
log_sink: str = "console"
extras: dict[str, Any] = field(default_factory=dict)
@@ -0,0 +1,10 @@
"""FDR producer client (E-CC-FDR-CLIENT / AZ-247).
Producer-side API used by every component. Consumer-side writer lives in
`gps_denied_onboard.components.c13_fdr` (AZ-248 / E-C13).
"""
from gps_denied_onboard.fdr_client.client import FdrClient
from gps_denied_onboard.fdr_client.records import FdrRecord
__all__ = ["FdrClient", "FdrRecord"]
@@ -0,0 +1,16 @@
"""FDR producer-side client API — STUB.
Concrete client is owned by AZ-273. Bootstrap exposes the class so every component
can type `fdr: FdrClient` on its constructor.
"""
from __future__ import annotations
from gps_denied_onboard.fdr_client.records import FdrRecord
class FdrClient:
"""Producer-side FDR API: enqueue records, drop-oldest on overrun."""
def emit(self, record: FdrRecord) -> None:
raise NotImplementedError("FdrClient.emit concrete impl is AZ-273")
@@ -0,0 +1,21 @@
"""Lock-free SPSC ring buffer — STUB.
Concrete drop-oldest-on-overrun ring buffer is owned by AZ-273.
"""
from __future__ import annotations
from typing import Any
class SpscRingBuffer:
"""Single-producer single-consumer lock-free ring buffer."""
def __init__(self, capacity: int) -> None:
self.capacity = capacity
def push(self, item: Any) -> bool:
raise NotImplementedError("FdrClient ring-buffer concrete impl is AZ-273")
def pop(self) -> Any | None:
raise NotImplementedError("FdrClient ring-buffer concrete impl is AZ-273")
@@ -0,0 +1,25 @@
"""FDR record schema — STUB.
Concrete schema (estimates / IMU / MAVLink / health / tile / thumbnail discriminated
record types) is owned by AZ-272. Bootstrap declares the umbrella DTO so every
producer can import it.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass(frozen=True)
class FdrRecord:
"""A single FDR record (record-type discriminator + payload).
The full discriminated-union of record types is defined by AZ-272.
"""
record_type: str
timestamp: datetime
producer: str
payload: dict[str, Any] = field(default_factory=dict)
@@ -0,0 +1,9 @@
"""FrameSource interface + concrete implementations.
The interface is bootstrap-stubbed here. `LiveCameraFrameSource` and
`VideoFileFrameSource` are owned by AZ-398.
"""
from gps_denied_onboard.frame_source.interface import FrameSource
__all__ = ["FrameSource"]
@@ -0,0 +1,18 @@
"""`FrameSource` Protocol.
Owned by AZ-398 (E-DEMO-REPLAY) for the formalisation; bootstrap ships the
interface stub so C1 can be constructor-injected against it.
"""
from __future__ import annotations
from collections.abc import Iterator
from typing import Protocol
from gps_denied_onboard._types.nav import NavCameraFrame
class FrameSource(Protocol):
"""A source of `NavCameraFrame` instances."""
def frames(self) -> Iterator[NavCameraFrame]: ...
+42
View File
@@ -0,0 +1,42 @@
"""Bootstrap healthcheck callable.
Used by both `companion-tier1` and `operator-tooling` Dockerfiles via
`HEALTHCHECK CMD python -m gps_denied_onboard.healthcheck`. Returns a non-zero exit
code on any failure so Docker's healthcheck loop marks the container unhealthy.
AC-6 (Bootstrap / AZ-263): this module must be importable and runnable.
"""
from __future__ import annotations
import sys
def check() -> int:
"""Quick liveness check: all foundation packages must be importable.
Returns 0 on success, non-zero exit code on failure.
"""
try:
import gps_denied_onboard # noqa: F401
from gps_denied_onboard import _types # noqa: F401
from gps_denied_onboard._types import ( # noqa: F401
calibration,
emitted,
manifests,
matching,
nav,
pose,
tile,
vio,
vpr,
)
from gps_denied_onboard.logging import get_logger # noqa: F401
except ImportError as exc:
print(f"healthcheck: ImportError: {exc}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(check())
@@ -0,0 +1,5 @@
"""Shared utilities (owned by AZ-264 / E-CC-HELPERS).
Bootstrap (AZ-263) creates these as importable stubs; concrete implementations are
filled in by per-helper tasks under AZ-264. See `_docs/02_document/common-helpers/`.
"""
@@ -0,0 +1,14 @@
"""Descriptor-normalisation utility — STUB.
Concrete impl owned by AZ-283. Contract:
`_docs/02_document/common-helpers/08_helper_descriptor_normaliser.md`.
"""
from __future__ import annotations
from typing import Any
def l2_normalise(descriptors: Any) -> Any:
"""L2-normalise a (N, D) descriptor matrix in-place semantics."""
raise NotImplementedError("descriptor_normaliser concrete impl is AZ-283")
@@ -0,0 +1,28 @@
"""TensorRT engine filename schema — STUB.
D-C10-7 self-describing engine names. Concrete impl owned by AZ-281. Contract:
`_docs/02_document/common-helpers/06_helper_engine_filename_schema.md`.
"""
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class EngineFilename:
"""Parsed parts of a self-describing engine filename."""
model_name: str
sm_arch: str
jetpack_version: str
tensorrt_version: str
precision: str
content_hash: str
def render(self) -> str:
raise NotImplementedError("engine_filename_schema concrete impl is AZ-281")
@classmethod
def parse(cls, filename: str) -> EngineFilename:
raise NotImplementedError("engine_filename_schema concrete impl is AZ-281")
@@ -0,0 +1,16 @@
"""IMU preintegration helper — STUB.
Concrete implementation is owned by AZ-276 (E-CC-HELPERS). Contract lives at
`_docs/02_document/common-helpers/01_helper_imu_preintegrator.md`.
"""
from __future__ import annotations
from typing import Any
class ImuPreintegrator:
"""Preintegrate IMU samples over a time window for VIO / state-estimator factor adds."""
def preintegrate(self, samples: Any) -> Any:
raise NotImplementedError("ImuPreintegrator concrete impl is AZ-276 (E-CC-HELPERS)")
@@ -0,0 +1,19 @@
"""Shared LightGlue inference runtime — STUB.
R14 fix: this helper is the single owner; both C2.5 (single-pair inlier counter)
and C3 (matcher) import it. Neither component depends on the other.
Concrete implementation is owned by AZ-278. Contract:
`_docs/02_document/common-helpers/03_helper_lightglue_runtime.md`.
"""
from __future__ import annotations
from typing import Any
class LightGlueRuntime:
"""Shared LightGlue matcher runtime."""
def match(self, descriptors_a: Any, descriptors_b: Any) -> Any:
raise NotImplementedError("LightGlueRuntime concrete impl is AZ-278")
@@ -0,0 +1,14 @@
"""Generic RANSAC inlier filter — STUB.
Concrete impl owned by AZ-282. Contract:
`_docs/02_document/common-helpers/07_helper_ransac_filter.md`.
"""
from __future__ import annotations
from typing import Any
def filter_inliers(matches: Any, threshold_px: float, max_iters: int = 1000) -> Any:
"""Run RANSAC on a set of point matches and return the inlier mask."""
raise NotImplementedError("ransac_filter concrete impl is AZ-282")
@@ -0,0 +1,29 @@
"""SE(3) utility helpers — STUB.
Concrete implementation is owned by AZ-277. Contract:
`_docs/02_document/common-helpers/02_helper_se3_utils.md`.
"""
from __future__ import annotations
from typing import Any
def compose(a_se3: Any, b_se3: Any) -> Any:
"""Compose two SE(3) transforms."""
raise NotImplementedError("se3_utils concrete impl is AZ-277")
def inverse(t_se3: Any) -> Any:
"""Invert an SE(3) transform."""
raise NotImplementedError("se3_utils concrete impl is AZ-277")
def log_map(t_se3: Any) -> Any:
"""SE(3) → se(3) log map (returns a 6-vector)."""
raise NotImplementedError("se3_utils concrete impl is AZ-277")
def exp_map(xi_6: Any) -> Any:
"""se(3) → SE(3) exp map (consumes a 6-vector)."""
raise NotImplementedError("se3_utils concrete impl is AZ-277")
@@ -0,0 +1,19 @@
"""Content-hash sidecar helper — STUB.
D-C10-3 content-hash gate. Concrete impl owned by AZ-280. Contract:
`_docs/02_document/common-helpers/05_helper_sha256_sidecar.md`.
"""
from __future__ import annotations
from pathlib import Path
def compute_sidecar(target_path: Path) -> Path:
"""Compute SHA-256 of `target_path` and write a sidecar file next to it."""
raise NotImplementedError("sha256_sidecar concrete impl is AZ-280")
def verify_sidecar(target_path: Path) -> bool:
"""Verify that the sidecar matches the file content."""
raise NotImplementedError("sha256_sidecar concrete impl is AZ-280")
@@ -0,0 +1,26 @@
"""WGS84 ↔ local-tangent-plane converter — STUB.
Concrete implementation is owned by AZ-279. Contract:
`_docs/02_document/common-helpers/04_helper_wgs_converter.md`.
"""
from __future__ import annotations
def wgs84_to_ltp(
lat_deg: float,
lon_deg: float,
alt_m: float,
ref_lat_deg: float,
ref_lon_deg: float,
ref_alt_m: float,
) -> tuple[float, float, float]:
"""Convert a WGS-84 lat/lon/alt to local-tangent-plane east/north/up metres."""
raise NotImplementedError("wgs_converter concrete impl is AZ-279")
def ltp_to_wgs84(
e_m: float, n_m: float, u_m: float, ref_lat_deg: float, ref_lon_deg: float, ref_alt_m: float
) -> tuple[float, float, float]:
"""Inverse of wgs84_to_ltp."""
raise NotImplementedError("wgs_converter concrete impl is AZ-279")
@@ -0,0 +1,9 @@
"""Structured JSON logging entrypoint (E-CC-LOG / AZ-245).
Bootstrap (AZ-263) ships a working `get_logger` so every other module can import it;
the concrete sink + redaction policy is layered on by AZ-266.
"""
from gps_denied_onboard.logging.structured import get_logger
__all__ = ["get_logger"]
@@ -0,0 +1,83 @@
"""Structured JSON logging.
E-CC-LOG / AZ-245 contract: one JSON object per log line. Bootstrap provides a
minimal working `get_logger(name)` so every other module can import it; AZ-266
will add full redaction and the FDR sink.
"""
from __future__ import annotations
import json
import logging
import os
import sys
import time
from typing import Any
class _JsonFormatter(logging.Formatter):
"""Emit a single JSON object per log line — no narrative log lines (E-CC-LOG)."""
def format(self, record: logging.LogRecord) -> str:
payload: dict[str, Any] = {
"ts": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(record.created))
+ f".{int(record.msecs):03d}Z",
"level": record.levelname,
"logger": record.name,
"msg": record.getMessage(),
}
if record.exc_info:
payload["exc"] = self.formatException(record.exc_info)
for key, value in record.__dict__.items():
if key in (
"args",
"msg",
"levelname",
"name",
"exc_info",
"exc_text",
"stack_info",
"created",
"msecs",
"relativeCreated",
"thread",
"threadName",
"processName",
"process",
"module",
"funcName",
"filename",
"pathname",
"lineno",
"levelno",
):
continue
payload.setdefault(key, value)
return json.dumps(payload, separators=(",", ":"), default=str)
_CONFIGURED = False
def _configure_root_once() -> None:
global _CONFIGURED
if _CONFIGURED:
return
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(_JsonFormatter())
root = logging.getLogger()
root.handlers.clear()
root.addHandler(handler)
level_name = os.getenv("LOG_LEVEL", "INFO").upper()
root.setLevel(getattr(logging, level_name, logging.INFO))
_CONFIGURED = True
def get_logger(name: str) -> logging.Logger:
"""Return a structured JSON logger.
Every component imports its logger via
`from gps_denied_onboard.logging import get_logger`.
"""
_configure_root_once()
return logging.getLogger(name)
+89
View File
@@ -0,0 +1,89 @@
"""Composition root (ADR-009 interface-first DI).
The only place that may import concrete strategy implementations across
components. Per-binary `compose_*` entrypoints select the strategy graph for that
binary (airborne / research / operator-tooling / replay-cli) — gated by CMake
`BUILD_*` flags at compile time and validated again here at startup.
Bootstrap (AZ-263) ships the entrypoints as stubs that perform the required env-var
fail-fast (AC-8). Per-component wiring is added by each component's "wire-in"
implementation task.
"""
from __future__ import annotations
import os
import sys
from collections.abc import Iterable
from dataclasses import dataclass
class ConfigurationError(RuntimeError):
"""Raised when a required environment variable is missing or a strategy whose
CMake `BUILD_*` flag is OFF would otherwise be wired."""
REQUIRED_ENV_VARS: tuple[str, ...] = (
"GPS_DENIED_FC_PROFILE",
"GPS_DENIED_TIER",
"DB_URL",
"CAMERA_CALIBRATION_PATH",
"LOG_LEVEL",
"LOG_SINK",
"INFERENCE_BACKEND",
"FDR_PATH",
"TILE_CACHE_PATH",
)
def _check_required_env(extra_required: Iterable[str] = ()) -> None:
"""Fail fast with a clear pointer at the first missing required env var.
AC-8 (Bootstrap / AZ-263).
"""
missing = [name for name in (*REQUIRED_ENV_VARS, *extra_required) if name not in os.environ]
if missing:
raise ConfigurationError(
"Missing required environment variable(s): "
+ ", ".join(missing)
+ ". See `.env.example` for the full list."
)
@dataclass(frozen=True)
class RuntimeRoot:
"""Composed runtime graph (placeholder; per-component wiring is per-task)."""
binary: str
profile: str
def compose_root(yaml_config_path: str | None = None) -> RuntimeRoot:
"""Compose the airborne runtime graph."""
_check_required_env(extra_required=("MAVLINK_SIGNING_KEY",))
return RuntimeRoot(binary="airborne", profile=os.environ["GPS_DENIED_FC_PROFILE"])
def compose_operator(yaml_config_path: str | None = None) -> RuntimeRoot:
"""Compose the operator-tooling runtime graph (pre-flight + post-landing)."""
_check_required_env(extra_required=("SATELLITE_PROVIDER_URL",))
return RuntimeRoot(binary="operator-tooling", profile=os.environ["GPS_DENIED_FC_PROFILE"])
def compose_replay(yaml_config_path: str | None = None) -> RuntimeRoot:
"""Compose the replay-cli runtime graph. Concrete wiring owned by AZ-401."""
_check_required_env()
return RuntimeRoot(binary="replay-cli", profile=os.environ["GPS_DENIED_FC_PROFILE"])
def main() -> int: # pragma: no cover — guarded entrypoint
try:
compose_root()
except ConfigurationError as exc:
print(f"runtime_root: {exc}", file=sys.stderr)
return 2
return 0
if __name__ == "__main__":
raise SystemExit(main())