"""`load_config` — the single entrypoint that materialises `Config` at startup. Implements the `composition_root_protocol` contract v1.0.0 (E-CC-CONF / AZ-269 / AZ-246). Precedence (highest -> lowest): 1. Environment variables (``env`` argument). 2. YAML files (``paths``), in order — later paths override earlier ones. 3. Documented defaults baked into the cross-cutting dataclasses. The returned `Config` is frozen end-to-end. Required env vars that fail to resolve raise `RequiredFieldMissingError` with the name of the offending variable and a pointer at ``.env.example``. """ from __future__ import annotations from collections.abc import Mapping, Sequence from pathlib import Path from typing import Any, Final import yaml from gps_denied_onboard.config.schema import ( _COMPONENT_REGISTRY, Config, ConfigError, FcConfig, FdrConfig, GcsConfig, LogConfig, ReplayAutoSyncConfig, ReplayConfig, RequiredFieldMissingError, RuntimeConfig, _replace_block, _resolve_component_blocks, ) __all__ = ["ENV_KEY_MAP", "load_config"] # Env-var -> (block, field) mapping. The composition root reads env vars # through this table so the YAML path and the env path stay in sync. ENV_KEY_MAP: Final[dict[str, tuple[str, str]]] = { # Cross-cutting blocks "GPS_DENIED_FC_PROFILE": ("runtime", "fc_profile"), "GPS_DENIED_TIER": ("runtime", "tier"), "DB_URL": ("runtime", "db_url"), "CAMERA_CALIBRATION_PATH": ("runtime", "camera_calibration_path"), "INFERENCE_BACKEND": ("runtime", "inference_backend"), "TILE_CACHE_PATH": ("runtime", "tile_cache_path"), "LOG_LEVEL": ("log", "level"), "LOG_TIER": ("log", "tier"), "LOG_SINK": ("log", "sink"), "FDR_PATH": ("fdr", "path"), "FDR_QUEUE_SIZE": ("fdr", "queue_size"), # C8 FC + GCS adapter blocks (AZ-390) "FC_ADAPTER": ("fc", "adapter"), "FC_PORT_DEVICE": ("fc", "port_device"), "FC_PORT_BAUD": ("fc", "port_baud"), "FC_SIGNING_KEY_SOURCE": ("fc", "signing_key_source"), "FC_DEV_STATIC_SIGNING_KEY": ("fc", "dev_static_signing_key"), "FC_SIGNING_FAILURE_THRESHOLD": ("fc", "signing_failure_threshold"), "FC_SPOOF_RECOVERY_SOURCE_SET": ("fc", "spoof_recovery_source_set"), "FC_SOURCE_SET_SWITCH_TIMEOUT_MS": ("fc", "source_set_switch_timeout_ms"), "GCS_ADAPTER": ("gcs", "adapter"), "GCS_PORT_DEVICE": ("gcs", "port_device"), "GCS_PORT_BAUD": ("gcs", "port_baud"), "GCS_SUMMARY_RATE_HZ": ("gcs", "summary_rate_hz"), # Replay block (AZ-401) "REPLAY_VIDEO_PATH": ("replay", "video_path"), "REPLAY_TLOG_PATH": ("replay", "tlog_path"), "REPLAY_OUTPUT_PATH": ("replay", "output_path"), "REPLAY_PACE": ("replay", "pace"), "REPLAY_TIME_OFFSET_MS": ("replay", "time_offset_ms"), "REPLAY_TARGET_FC_DIALECT": ("replay", "target_fc_dialect"), } # Env vars that MUST resolve to a non-empty value before `load_config` # can return (per AZ-263 AC-8 + AZ-269 AC-6). Missing values trigger # `RequiredFieldMissingError` with the variable name in the message. _REQUIRED_ENV_VARS: Final[tuple[str, ...]] = ( "GPS_DENIED_FC_PROFILE", "GPS_DENIED_TIER", "DB_URL", "CAMERA_CALIBRATION_PATH", "LOG_LEVEL", "LOG_SINK", "INFERENCE_BACKEND", "FDR_PATH", "TILE_CACHE_PATH", ) # Field-name -> python type. We coerce string env vars + raw YAML scalars # into the dataclass's declared types so `Config.runtime.tier` is always # `int` regardless of source. _FIELD_COERCIONS: Final[dict[str, type]] = { "tier": int, "queue_size": int, "level": str, "sink": str, "path": str, "fc_profile": str, "db_url": str, "camera_calibration_path": str, "inference_backend": str, "tile_cache_path": str, "overrun_policy": str, # C8 FC + GCS adapter coercions (AZ-390) "adapter": str, "port_device": str, "port_baud": int, "signing_key_source": str, "dev_static_signing_key": str, "signing_failure_threshold": int, "spoof_recovery_source_set": int, "source_set_switch_timeout_ms": int, "summary_rate_hz": float, # Replay block coercions (AZ-401) "video_path": str, "tlog_path": str, "output_path": str, "pace": str, "target_fc_dialect": str, } def _coerce_value(field_name: str, raw: Any) -> Any: target_type = _FIELD_COERCIONS.get(field_name) if target_type is None or isinstance(raw, target_type): return raw try: return target_type(raw) except (TypeError, ValueError) as exc: raise RequiredFieldMissingError( f"config field {field_name!r}: cannot coerce {raw!r} to {target_type.__name__} ({exc})" ) from exc def _coerce_optional_int(field_name: str, raw: Any) -> int | None: """Coerce ``raw`` to ``int`` or ``None`` (empty / null sentinels become ``None``).""" if raw is None: return None if isinstance(raw, str) and raw.strip() == "": return None if isinstance(raw, int) and not isinstance(raw, bool): return raw try: return int(raw) except (TypeError, ValueError) as exc: raise RequiredFieldMissingError( f"config field {field_name!r}: cannot coerce {raw!r} to int ({exc})" ) from exc def _build_replay_block(overrides: Mapping[str, Any]) -> ReplayConfig: """Build a :class:`ReplayConfig` from YAML/env overrides. Handles two non-trivial coercions the generic path cannot: * ``time_offset_ms`` — ``int | None`` (empty string / None → None). * ``auto_sync`` — nested mapping → :class:`ReplayAutoSyncConfig`. """ flat: dict[str, Any] = {} auto_sync_overrides: Mapping[str, Any] = {} for key, value in overrides.items(): if key == "auto_sync": if value is None: continue if not isinstance(value, Mapping): raise ConfigError( f"replay.auto_sync must be a mapping; got {type(value).__name__}" ) auto_sync_overrides = value continue if key == "time_offset_ms": flat[key] = _coerce_optional_int(key, value) continue flat[key] = _coerce_value(key, value) auto_sync_block = _replace_block( ReplayAutoSyncConfig(), {k: _coerce_replay_auto_sync_field(k, v) for k, v in auto_sync_overrides.items()}, ) flat["auto_sync"] = auto_sync_block return _replace_block(ReplayConfig(), flat) _REPLAY_AUTO_SYNC_TYPES: Final[dict[str, type]] = { "takeoff_accel_threshold_g": float, "takeoff_attitude_rate_threshold_rad_s": float, "sustained_seconds": float, "prescan_max_messages": int, "video_motion_threshold": float, "video_motion_scan_seconds": float, "match_threshold_pct": float, "match_window_ms": int, "low_confidence_threshold": float, } def _coerce_replay_auto_sync_field(field_name: str, raw: Any) -> Any: target_type = _REPLAY_AUTO_SYNC_TYPES.get(field_name) if target_type is None or isinstance(raw, target_type): return raw try: return target_type(raw) except (TypeError, ValueError) as exc: raise RequiredFieldMissingError( f"config field replay.auto_sync.{field_name}: cannot coerce {raw!r} " f"to {target_type.__name__} ({exc})" ) from exc _TOP_LEVEL_SCALAR_FIELDS: Final[frozenset[str]] = frozenset({"mode"}) def _load_yaml_files(paths: Sequence[Path]) -> dict[str, dict[str, Any]]: """Merge YAML files in order: later paths win for the same block + field. Top-level scalar fields named in :data:`_TOP_LEVEL_SCALAR_FIELDS` (currently ``mode``) are collected under the synthetic ``__top__`` block so the ``Config`` outer fields can be overridden alongside the nested cross-cutting / component blocks. """ merged: dict[str, dict[str, Any]] = {} for path in paths: data = yaml.safe_load(path.read_text()) or {} if not isinstance(data, dict): raise RequiredFieldMissingError( f"YAML at {path} must be a mapping at the top level; got {type(data).__name__}" ) for block_name, block_value in data.items(): if block_name in _TOP_LEVEL_SCALAR_FIELDS: merged.setdefault("__top__", {})[block_name] = block_value continue if not isinstance(block_value, dict): continue merged.setdefault(block_name, {}).update(block_value) return merged def _apply_env_overrides(layered: dict[str, dict[str, Any]], env: Mapping[str, str]) -> None: """Overlay env-var values on the per-block override dictionaries.""" for env_key, (block_name, field_name) in ENV_KEY_MAP.items(): if env_key not in env: continue layered.setdefault(block_name, {})[field_name] = env[env_key] def _check_required_env(env: Mapping[str, str]) -> None: """AC-6 + AZ-263 AC-8: missing required vars fail fast with a pointer.""" missing = [name for name in _REQUIRED_ENV_VARS if not env.get(name)] if missing: raise RequiredFieldMissingError( "Missing required environment variable(s): " + ", ".join(missing) + ". See `.env.example` for the documented set." ) def load_config( env: Mapping[str, str], paths: Sequence[Path] = (), *, require_env: bool = True, ) -> Config: """Build a frozen `Config` from env + YAML files + documented defaults. Precedence: env > YAML > defaults. `paths` may be empty; missing keys fall to the dataclass-declared defaults. """ if require_env: _check_required_env(env) yaml_overrides = _load_yaml_files(paths) if paths else {} _apply_env_overrides(yaml_overrides, env) runtime_block = _replace_block( RuntimeConfig(), {k: _coerce_value(k, v) for k, v in yaml_overrides.get("runtime", {}).items()}, ) log_block = _replace_block( LogConfig(), {k: _coerce_value(k, v) for k, v in yaml_overrides.get("log", {}).items()}, ) fdr_block = _replace_block( FdrConfig(), {k: _coerce_value(k, v) for k, v in yaml_overrides.get("fdr", {}).items()}, ) fc_block = _replace_block( FcConfig(), {k: _coerce_value(k, v) for k, v in yaml_overrides.get("fc", {}).items()}, ) gcs_block = _replace_block( GcsConfig(), {k: _coerce_value(k, v) for k, v in yaml_overrides.get("gcs", {}).items()}, ) replay_block = _build_replay_block(yaml_overrides.get("replay", {})) raw_mode = yaml_overrides.get("__top__", {}).get("mode") if raw_mode is None: raw_mode = env.get("MODE", "live") mode = str(raw_mode).strip().lower() component_blocks = _resolve_component_blocks() for slug, dataclass_type in _COMPONENT_REGISTRY.items(): block_overrides = yaml_overrides.get(slug, {}) if block_overrides: component_blocks[slug] = _replace_block( dataclass_type(), {k: _coerce_value(k, v) for k, v in block_overrides.items()}, ) return Config( runtime=runtime_block, log=log_block, fdr=fdr_block, fc=fc_block, gcs=gcs_block, replay=replay_block, mode=mode, # type: ignore[arg-type] # validated by Config.__post_init__ components=component_blocks, )