Files
gps-denied-onboard/src/gps_denied/pipeline/composition.py
T
Yuzviak 7e64ef8d2b feat(stage2-phase2): structlog hot-path, pytest markers, obs package
Phase 2 deliverables not yet committed from plan execution:
- structlog wired to 10 hot-path files (orchestrator, eskf, components)
- bind_contextvars(correlation_id=frame_id) in process_frame
- obs/logging_config.py: configure_logging(env) JSON/console renderer
- pyproject.toml: structlog>=25.1, --strict-markers, 6 markers registered
- tests/conftest.py: ac(id) validator plugin + pytest_collection hooks

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-11 19:06:47 +03:00

162 lines
6.1 KiB
Python

"""Env-aware pipeline composition root (ARCH-01 / ARCH-03, Plan 08).
``build_pipeline`` is the single entry point that wires all concrete adapters
into a :class:`FlightProcessor`. Callers in ``app.py`` and ``api/deps.py``
should import this function rather than instantiating components directly.
Env-conditional wiring
----------------------
- ``env="jetson"`` → prefer_cuvslam=True, prefer_mono_depth=True
- ``env="x86_dev"`` → prefer_cuvslam=False, prefer_mono_depth=False
- ``env="ci"`` → prefer_cuvslam=False, prefer_mono_depth=False
- ``env="sitl"`` → prefer_cuvslam=False, prefer_mono_depth=False
"""
from __future__ import annotations
import logging
from typing import Optional
from gps_denied.obs import configure_logging
logger = logging.getLogger(__name__)
def build_pipeline(
env: str = "x86_dev",
config=None,
repository=None,
streamer=None,
) -> "FlightProcessor":
"""Build and return a fully-wired :class:`FlightProcessor`.
Parameters
----------
env:
Target runtime environment. One of ``"jetson"``, ``"x86_dev"``,
``"ci"``, ``"sitl"``.
config:
Optional :class:`~gps_denied.config.AppSettings` instance. When
``None``, a fresh ``AppSettings()`` is constructed.
repository:
Optional :class:`~gps_denied.db.repository.FlightRepository`.
``None`` is acceptable for smoke-tests / lifespan startup; ``deps.py``
swaps in a real session-scoped instance per request.
streamer:
Optional :class:`~gps_denied.pipeline.sse_streamer.SSEEventStreamer`.
Defaults to a fresh in-process instance when ``None``.
Returns
-------
FlightProcessor
Fully wired processor with all components attached.
"""
# OBS-01: configure structlog idempotently so tests / scripts calling
# build_pipeline directly (without app.py lifespan) still get logging configured.
configure_logging(env=env)
# Lazy imports to avoid circular import chains at module load time.
from gps_denied.components.gpr.faiss_gpr import GlobalPlaceRecognition
from gps_denied.components.mavlink_io.pymavlink_bridge import MAVLinkBridge
from gps_denied.components.satellite_matcher.local_tile_loader import SatelliteDataManager
from gps_denied.components.satellite_matcher.metric_refinement import MetricRefinement
from gps_denied.components.vio.factory import create_vo_backend
from gps_denied.core.chunk_manager import RouteChunkManager
from gps_denied.core.coordinates import CoordinateTransformer
from gps_denied.core.factor_graph import FactorGraphOptimizer
from gps_denied.core.models import ModelManager
from gps_denied.core.recovery import FailureRecoveryCoordinator
from gps_denied.core.rotation import ImageRotationManager
from gps_denied.pipeline.orchestrator import FlightProcessor
from gps_denied.pipeline.sse_streamer import SSEEventStreamer
from gps_denied.schemas.graph import FactorGraphConfig
if config is None:
from gps_denied.config import AppSettings
config = AppSettings()
if streamer is None:
streamer = SSEEventStreamer()
# ------------------------------------------------------------------
# Env-conditional flags
# ------------------------------------------------------------------
prefer_cuvslam = env == "jetson"
prefer_mono_depth = env == "jetson"
# ------------------------------------------------------------------
# Model manager — ModelManager auto-selects TRT on Jetson
# ------------------------------------------------------------------
mm = ModelManager(engine_dir=str(config.models.weights_dir))
# ------------------------------------------------------------------
# Component wiring (mirrors lifespan in app.py)
# ------------------------------------------------------------------
vo = create_vo_backend(
model_manager=mm,
prefer_cuvslam=prefer_cuvslam,
prefer_mono_depth=prefer_mono_depth,
)
gpr = GlobalPlaceRecognition(mm)
metric = MetricRefinement(mm)
graph = FactorGraphOptimizer(FactorGraphConfig())
chunk_mgr = RouteChunkManager(graph)
recovery = FailureRecoveryCoordinator(chunk_mgr, gpr, metric)
rotation = ImageRotationManager(mm)
coord = CoordinateTransformer()
satellite = SatelliteDataManager(tile_dir=config.satellite.tile_dir)
# MAVLink: ci env may have no network — catch and fall back to None
mavlink = None
if env != "ci":
try:
mavlink = MAVLinkBridge(
connection_string=config.mavlink.connection,
output_hz=config.mavlink.output_hz,
telemetry_hz=config.mavlink.telemetry_hz,
)
except Exception as exc:
logger.warning("MAVLink bridge instantiation failed (env=%s): %s", env, exc)
else:
# ci: attempt anyway but tolerate failures
try:
mavlink = MAVLinkBridge(
connection_string=config.mavlink.connection,
output_hz=config.mavlink.output_hz,
telemetry_hz=config.mavlink.telemetry_hz,
)
except Exception as exc:
logger.info("MAVLink skipped in ci env: %s", exc)
mavlink = None
# ------------------------------------------------------------------
# Construct processor and attach all components
# ------------------------------------------------------------------
from gps_denied.schemas.eskf import ESKFConfig
eskf_config = ESKFConfig(**config.eskf.model_dump())
processor = FlightProcessor(
repository=repository,
streamer=streamer,
eskf_config=eskf_config,
)
processor.attach_components(
vo=vo,
gpr=gpr,
metric=metric,
graph=graph,
recovery=recovery,
chunk_mgr=chunk_mgr,
rotation=rotation,
coord=coord,
satellite=satellite,
mavlink=mavlink,
)
logger.info(
"Pipeline built: env=%s, prefer_cuvslam=%s, prefer_mono_depth=%s, mavlink=%s",
env, prefer_cuvslam, prefer_mono_depth,
config.mavlink.connection if mavlink else "disabled",
)
return processor