"""Composition-root factories for C8 (AZ-390 / E-C8). Lazy-imports the per-variant adapter classes so the ADR-002 build-flag gate stays honest: the binary's bootstrap (one module per ``BUILD_FC_*`` / ``BUILD_GCS_*`` combination) registers the concrete strategy via :func:`register_fc_adapter` / :func:`register_gcs_adapter` ahead of `build_fc_adapter` / `build_gcs_adapter`. A second binding to the outbound emit thread is rejected (AC-6); the single-writer invariant for outbound is enforced statically by the composition root, not by the adapter itself. """ from __future__ import annotations import os import threading from collections.abc import Callable from typing import Any, Final from gps_denied_onboard.components.c8_fc_adapter.errors import ( FcAdapterConfigError, GcsAdapterConfigError, ) from gps_denied_onboard.components.c8_fc_adapter.interface import ( FcAdapter, GcsAdapter, ) from gps_denied_onboard.config import Config from gps_denied_onboard.logging import get_logger __all__ = [ "OutboundThreadAlreadyBoundError", "bind_outbound_emit_thread", "build_fc_adapter", "build_gcs_adapter", "clear_outbound_thread_binding", "clear_strategy_registries", "list_registered_fc_strategies", "list_registered_gcs_strategies", "register_fc_adapter", "register_gcs_adapter", ] # ---------------------------------------------------------------------- # Strategy registries (single source of truth; populated by binary # bootstrap modules per ADR-002). FcAdapterFactory = Callable[..., FcAdapter] GcsAdapterFactory = Callable[..., GcsAdapter] _FC_REGISTRY: dict[str, FcAdapterFactory] = {} _GCS_REGISTRY: dict[str, GcsAdapterFactory] = {} # Mapping from strategy slug -> documented BUILD_*_ flag name. The # build-flag gate (AC-4) checks ``os.environ`` for the canonical name # because flag state is a build-time artifact, not a config-time # artifact. _FC_BUILD_FLAGS: Final[dict[str, str]] = { "ardupilot_plane": "BUILD_FC_ARDUPILOT_PLANE", "inav": "BUILD_FC_INAV", } _GCS_BUILD_FLAGS: Final[dict[str, str]] = { "qgc_mavlink": "BUILD_GCS_QGC_MAVLINK", } def register_fc_adapter(strategy: str, factory: FcAdapterFactory) -> None: """Register a concrete `FcAdapter` strategy. Called from the per-binary bootstrap module (e.g. ``runtime_root._bootstrap_ap.py``) under the matching ``BUILD_FC_`` flag. Duplicate registration with a different factory is a build error. """ existing = _FC_REGISTRY.get(strategy) if existing is not None and existing is not factory: raise FcAdapterConfigError(f"duplicate FcAdapter registration for strategy {strategy!r}") _FC_REGISTRY[strategy] = factory def register_gcs_adapter(strategy: str, factory: GcsAdapterFactory) -> None: existing = _GCS_REGISTRY.get(strategy) if existing is not None and existing is not factory: raise GcsAdapterConfigError(f"duplicate GcsAdapter registration for strategy {strategy!r}") _GCS_REGISTRY[strategy] = factory def clear_strategy_registries() -> None: """Reset both registries; intended for unit-test isolation only.""" _FC_REGISTRY.clear() _GCS_REGISTRY.clear() def list_registered_fc_strategies() -> list[str]: return sorted(_FC_REGISTRY) def list_registered_gcs_strategies() -> list[str]: return sorted(_GCS_REGISTRY) # ---------------------------------------------------------------------- # Single-writer outbound thread enforcement (Invariant 8 / AC-6). class OutboundThreadAlreadyBoundError(RuntimeError): """Raised on a second :func:`bind_outbound_emit_thread` call.""" _outbound_lock = threading.Lock() _outbound_bound_thread: int | None = None def bind_outbound_emit_thread(thread_ident: int | None = None) -> int: """Bind ``thread_ident`` (defaults to the caller) as the sole emit thread. A second call from any thread raises :class:`OutboundThreadAlreadyBoundError`. The runtime root calls this once per process before wiring outbound emit; the result is the canonical thread id the adapter checks on every outbound call. """ global _outbound_bound_thread ident = thread_ident if thread_ident is not None else threading.get_ident() with _outbound_lock: if _outbound_bound_thread is not None and _outbound_bound_thread != ident: raise OutboundThreadAlreadyBoundError( f"outbound emit thread already bound to {_outbound_bound_thread}; " f"refused to re-bind to {ident}" ) _outbound_bound_thread = ident return ident def clear_outbound_thread_binding() -> None: """Reset the outbound-thread binding; intended for unit-test isolation.""" global _outbound_bound_thread with _outbound_lock: _outbound_bound_thread = None # ---------------------------------------------------------------------- # Build helpers — invoked by `compose_root` after C5 (FC) and after # the FC adapter (GCS). def build_fc_adapter(config: Config, **deps: Any) -> FcAdapter: """Resolve and build the configured `FcAdapter` strategy. Validates the build-flag gate (AC-4); raises :class:`FcAdapterConfigError` with the disabled-flag name when the requested strategy is not linked into the running binary. """ strategy = config.fc.adapter flag_name = _FC_BUILD_FLAGS.get(strategy) if flag_name is None: # config.fc.adapter went through FcConfig validation, so an # unknown strategy here means we forgot to add it to the # build-flag table — fail loudly. raise FcAdapterConfigError(f"FC strategy {strategy!r} has no BUILD_FC_* flag mapping") if os.environ.get(flag_name, "ON").upper() == "OFF": raise FcAdapterConfigError( f"{flag_name} is OFF — strategy {strategy!r} is not linked into this binary" ) factory = _FC_REGISTRY.get(strategy) if factory is None: raise FcAdapterConfigError( f"FC strategy {strategy!r} is selected by config.fc.adapter but " f"not registered; registered strategies: " f"{list_registered_fc_strategies()}" ) adapter = factory(config=config, **deps) _log_strategy_loaded( kind="c8.adapter.strategy_loaded", strategy=strategy, port_device=config.fc.port_device, ) return adapter def build_gcs_adapter(config: Config, **deps: Any) -> GcsAdapter: """Resolve and build the configured `GcsAdapter` strategy (AC-7).""" strategy = config.gcs.adapter flag_name = _GCS_BUILD_FLAGS.get(strategy) if flag_name is None: raise GcsAdapterConfigError(f"GCS strategy {strategy!r} has no BUILD_GCS_* flag mapping") if os.environ.get(flag_name, "ON").upper() == "OFF": raise GcsAdapterConfigError( f"{flag_name} is OFF — strategy {strategy!r} is not linked into this binary" ) factory = _GCS_REGISTRY.get(strategy) if factory is None: raise GcsAdapterConfigError( f"GCS strategy {strategy!r} is selected by config.gcs.adapter but " f"not registered; registered strategies: " f"{list_registered_gcs_strategies()}" ) adapter = factory(config=config, **deps) _log_strategy_loaded( kind="c8.gcs.strategy_loaded", strategy=strategy, port_device=config.gcs.port_device, ) return adapter def _log_strategy_loaded(*, kind: str, strategy: str, port_device: str) -> None: log = get_logger("runtime_root.fc_factory") log.info( f"{kind}: strategy={strategy} port_device={port_device}", extra={ "kind": kind, "kv": {"strategy": strategy, "port_device": port_device}, }, )