"""Env-aware pipeline composition root (ARCH-01 / ARCH-03, Plan 08). ``build_pipeline`` is the single entry point that wires all concrete adapters into a :class:`FlightProcessor`. Callers in ``app.py`` and ``api/deps.py`` should import this function rather than instantiating components directly. Env-conditional wiring ---------------------- - ``env="jetson"`` → prefer_cuvslam=True, prefer_mono_depth=True - ``env="x86_dev"`` → prefer_cuvslam=False, prefer_mono_depth=False - ``env="ci"`` → prefer_cuvslam=False, prefer_mono_depth=False - ``env="sitl"`` → prefer_cuvslam=False, prefer_mono_depth=False """ from __future__ import annotations import logging from typing import Optional logger = logging.getLogger(__name__) def build_pipeline( env: str = "x86_dev", config=None, repository=None, streamer=None, ) -> "FlightProcessor": """Build and return a fully-wired :class:`FlightProcessor`. Parameters ---------- env: Target runtime environment. One of ``"jetson"``, ``"x86_dev"``, ``"ci"``, ``"sitl"``. config: Optional :class:`~gps_denied.config.AppSettings` instance. When ``None``, a fresh ``AppSettings()`` is constructed. repository: Optional :class:`~gps_denied.db.repository.FlightRepository`. ``None`` is acceptable for smoke-tests / lifespan startup; ``deps.py`` swaps in a real session-scoped instance per request. streamer: Optional :class:`~gps_denied.pipeline.sse_streamer.SSEEventStreamer`. Defaults to a fresh in-process instance when ``None``. Returns ------- FlightProcessor Fully wired processor with all components attached. """ # Lazy imports to avoid circular import chains at module load time. from gps_denied.components.gpr.faiss_gpr import GlobalPlaceRecognition from gps_denied.components.mavlink_io.pymavlink_bridge import MAVLinkBridge from gps_denied.components.satellite_matcher.local_tile_loader import SatelliteDataManager from gps_denied.components.satellite_matcher.metric_refinement import MetricRefinement from gps_denied.components.vio.factory import create_vo_backend from gps_denied.core.chunk_manager import RouteChunkManager from gps_denied.core.coordinates import CoordinateTransformer from gps_denied.core.factor_graph import FactorGraphOptimizer from gps_denied.core.models import ModelManager from gps_denied.core.recovery import FailureRecoveryCoordinator from gps_denied.core.rotation import ImageRotationManager from gps_denied.pipeline.orchestrator import FlightProcessor from gps_denied.pipeline.sse_streamer import SSEEventStreamer from gps_denied.schemas.graph import FactorGraphConfig if config is None: from gps_denied.config import AppSettings config = AppSettings() if streamer is None: streamer = SSEEventStreamer() # ------------------------------------------------------------------ # Env-conditional flags # ------------------------------------------------------------------ prefer_cuvslam = env == "jetson" prefer_mono_depth = env == "jetson" # ------------------------------------------------------------------ # Model manager — ModelManager auto-selects TRT on Jetson # ------------------------------------------------------------------ mm = ModelManager(engine_dir=str(config.models.weights_dir)) # ------------------------------------------------------------------ # Component wiring (mirrors lifespan in app.py) # ------------------------------------------------------------------ vo = create_vo_backend( model_manager=mm, prefer_cuvslam=prefer_cuvslam, prefer_mono_depth=prefer_mono_depth, ) gpr = GlobalPlaceRecognition(mm) metric = MetricRefinement(mm) graph = FactorGraphOptimizer(FactorGraphConfig()) chunk_mgr = RouteChunkManager(graph) recovery = FailureRecoveryCoordinator(chunk_mgr, gpr, metric) rotation = ImageRotationManager(mm) coord = CoordinateTransformer() satellite = SatelliteDataManager(tile_dir=config.satellite.tile_dir) # MAVLink: ci env may have no network — catch and fall back to None mavlink = None if env != "ci": try: mavlink = MAVLinkBridge( connection_string=config.mavlink.connection, output_hz=config.mavlink.output_hz, telemetry_hz=config.mavlink.telemetry_hz, ) except Exception as exc: logger.warning("MAVLink bridge instantiation failed (env=%s): %s", env, exc) else: # ci: attempt anyway but tolerate failures try: mavlink = MAVLinkBridge( connection_string=config.mavlink.connection, output_hz=config.mavlink.output_hz, telemetry_hz=config.mavlink.telemetry_hz, ) except Exception as exc: logger.info("MAVLink skipped in ci env: %s", exc) mavlink = None # ------------------------------------------------------------------ # Construct processor and attach all components # ------------------------------------------------------------------ from gps_denied.schemas.eskf import ESKFConfig eskf_config = ESKFConfig(**config.eskf.model_dump()) processor = FlightProcessor( repository=repository, streamer=streamer, eskf_config=eskf_config, ) processor.attach_components( vo=vo, gpr=gpr, metric=metric, graph=graph, recovery=recovery, chunk_mgr=chunk_mgr, rotation=rotation, coord=coord, satellite=satellite, mavlink=mavlink, ) logger.info( "Pipeline built: env=%s, prefer_cuvslam=%s, prefer_mono_depth=%s, mavlink=%s", env, prefer_cuvslam, prefer_mono_depth, config.mavlink.connection if mavlink else "disabled", ) return processor