feat(stage2-phase2): structlog hot-path, pytest markers, obs package

Phase 2 deliverables not yet committed from plan execution:
- structlog wired to 10 hot-path files (orchestrator, eskf, components)
- bind_contextvars(correlation_id=frame_id) in process_frame
- obs/logging_config.py: configure_logging(env) JSON/console renderer
- pyproject.toml: structlog>=25.1, --strict-markers, 6 markers registered
- tests/conftest.py: ac(id) validator plugin + pytest_collection hooks

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Yuzviak
2026-05-11 19:06:47 +03:00
parent 7f76acfe29
commit 7e64ef8d2b
15 changed files with 286 additions and 78 deletions
+14 -1
View File
@@ -17,9 +17,11 @@ dependencies = [
"diskcache>=5.6",
"numpy>=1.26,<2.0", # NumPy 2.0 silently breaks GTSAM Python bindings (issue #2264)
"opencv-python-headless>=4.9,<4.11", # 4.11+ requires numpy>=2.0 (incompatible with GTSAM)
"orjson>=3.10",
"gtsam>=4.3a0",
"pymavlink>=2.4",
"pyyaml>=6.0",
"structlog>=25.1,<26",
]
[project.optional-dependencies]
@@ -53,8 +55,19 @@ select = ["E", "F", "I", "W"]
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
# --strict-markers makes unregistered @pytest.mark.<x> fail collection rather than warn.
# Per Phase 2 / TEST-02 contract; do not weaken without an explicit Phase 2 retrospective entry.
addopts = "--strict-markers"
markers = [
"e2e: end-to-end test against a real dataset",
# Phase 2 / TEST-01 taxonomy
"unit: pure-math or single-class test; only mocks; no I/O / no real DB / no real engines / no SITL; runs in <1s",
"integration: cross-subsystem (in-memory SQLite, ASGI transport, full FlightProcessor wiring across >=3 real components); no external process",
"blackbox: validates an external contract (e.g. MAVLink GPS_INPUT wire encoding per MAVLink #232) without a live producer",
"sitl: requires ARDUPILOT_SITL_HOST env var; talks to an ArduPilot SITL process over MAVLink; nightly-only",
"e2e: full-pipeline run against a real dataset (EuRoC, VPAIR, MARS-LVIG, Azaion) or its synthetic stand-in via E2EHarness; nightly-only",
# Phase 2 / TEST-03 traceability
"ac(ac_id): link test to one or more Acceptance Criteria (e.g. AC-1.1, AC-NEW-3); validated against _docs/00_problem/acceptance_criteria.md by scripts/gen_ac_traceability.py",
# Pre-existing (kept verbatim)
"e2e_slow: e2e test that takes > 2 minutes, nightly-only",
"needs_dataset: test requires an external dataset to be downloaded",
]
+4
View File
@@ -8,6 +8,7 @@ from fastapi import FastAPI
from gps_denied import __version__
from gps_denied.api.routers import flights
from gps_denied.config import RuntimeConfig
from gps_denied.obs import configure_logging
from gps_denied.pipeline import build_pipeline
logger = logging.getLogger(__name__)
@@ -17,6 +18,9 @@ logger = logging.getLogger(__name__)
async def lifespan(app: FastAPI):
"""Initialise core pipeline components on startup via build_pipeline."""
cfg = RuntimeConfig()
# OBS-01: configure structlog ONCE before pipeline construction.
# Non-hot-path logging (api/, db/) continues to use stdlib until Phase 6.
configure_logging(env=cfg.env)
processor = build_pipeline(env=cfg.env, config=cfg)
# Retrieve MAVLink bridge from processor internals for lifecycle management
@@ -7,12 +7,12 @@ The legacy import path (gps_denied.core.mavlink) re-exports everything here.
from __future__ import annotations
import asyncio
import logging
import math
import time
from typing import Callable, Optional
import numpy as np
import structlog
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.eskf import ConfidenceTier, ESKFState, IMUMeasurement
@@ -22,7 +22,7 @@ from gps_denied.schemas.mavlink import (
TelemetryMessage,
)
logger = logging.getLogger(__name__)
logger = structlog.get_logger(__name__)
# ---------------------------------------------------------------------------
# pymavlink conditional import
@@ -30,11 +30,11 @@ logger = logging.getLogger(__name__)
try:
from pymavlink import mavutil as _mavutil # type: ignore
_PYMAVLINK_AVAILABLE = True
logger.info("pymavlink available — real MAVLink connection enabled")
logger.info("pymavlink_available", mode="real_mavlink")
except ImportError:
_mavutil = None # type: ignore
_PYMAVLINK_AVAILABLE = False
logger.info("pymavlink not available — using MockMAVConnection (dev/CI mode)")
logger.info("pymavlink_unavailable", fallback="MockMAVConnection")
# GPS epoch offset from Unix epoch (seconds)
_GPS_EPOCH_OFFSET = 315_964_800
@@ -211,7 +211,7 @@ class MAVLinkBridge:
asyncio.create_task(self._imu_receive_loop(), name="mav_imu_input"),
asyncio.create_task(self._telemetry_loop(), name="mav_telemetry"),
]
logger.info("MAVLinkBridge started (conn=%s, %g Hz)", self.connection_string, self.output_hz)
logger.info("mavlink_bridge_started", conn=self.connection_string, output_hz=self.output_hz)
async def stop(self) -> None:
"""Cancel background tasks and close connection."""
@@ -223,8 +223,7 @@ class MAVLinkBridge:
if self._conn:
self._conn.close()
self._conn = None
logger.info("MAVLinkBridge stopped. sent=%d imu_rx=%d",
self._sent_count, self._recv_imu_count)
logger.info("mavlink_bridge_stopped", sent=self._sent_count, imu_rx=self._recv_imu_count)
def build_gps_input(self) -> Optional[GPSInputMessage]:
"""Build GPSInputMessage from current ESKF state (public, for testing)."""
@@ -238,6 +237,7 @@ class MAVLinkBridge:
async def _gps_output_loop(self) -> None:
"""Send GPS_INPUT at output_hz. MAV-01 / MAV-02."""
log = logger.bind(task="gps_output_loop")
interval = 1.0 / self.output_hz
while self._running:
try:
@@ -250,7 +250,7 @@ class MAVLinkBridge:
if self._consecutive_failures >= self.max_consecutive_failures:
self._send_reloc_request()
except Exception as exc:
logger.warning("GPS output loop error: %s", exc)
log.warning("gps_output_loop_error", error=str(exc))
await asyncio.sleep(interval)
def _send_gps_input(self, msg: GPSInputMessage) -> None:
@@ -289,7 +289,7 @@ class MAVLinkBridge:
lon=msg.lon,
)
except Exception as exc:
logger.error("Failed to send GPS_INPUT: %s", exc)
logger.error("gps_input_send_failed", error=str(exc))
# ------------------------------------------------------------------
# MAV-03: IMU receive loop
@@ -297,6 +297,7 @@ class MAVLinkBridge:
async def _imu_receive_loop(self) -> None:
"""Receive ATTITUDE/RAW_IMU and invoke ESKF callback. MAV-03."""
log = logger.bind(task="imu_receive_loop")
while self._running:
try:
raw = self._recv_imu()
@@ -305,7 +306,7 @@ class MAVLinkBridge:
if self._on_imu:
self._on_imu(raw)
except Exception as exc:
logger.warning("IMU receive loop error: %s", exc)
log.warning("imu_receive_loop_error", error=str(exc))
await asyncio.sleep(0.01) # poll at ~100 Hz; blocks throttled by recv_match timeout
def _recv_imu(self) -> Optional[IMUMeasurement]:
@@ -334,7 +335,7 @@ class MAVLinkBridge:
timestamp=t,
)
except Exception as exc:
logger.debug("IMU recv error: %s", exc)
logger.debug("imu_recv_error", error=str(exc))
return None
# ------------------------------------------------------------------
@@ -364,9 +365,9 @@ class MAVLinkBridge:
)
else:
self._conn.named_value_float_send(time=t_boot_ms, name=name, value=value)
logger.warning("Re-localisation request sent (failures=%d)", self._consecutive_failures)
logger.warning("reloc_request_sent", consecutive_failures=self._consecutive_failures)
except Exception as exc:
logger.error("Failed to send reloc request: %s", exc)
logger.error("reloc_request_send_failed", error=str(exc))
def _build_reloc_request(self) -> RelocalizationRequest:
last_lat, last_lon = None, None
@@ -393,13 +394,14 @@ class MAVLinkBridge:
async def _telemetry_loop(self) -> None:
"""Send confidence + drift at 1 Hz. MAV-05."""
log = logger.bind(task="telemetry_loop")
interval = 1.0 / self.telemetry_hz
while self._running:
try:
self._send_telemetry()
self._frames_since_sat += 1
except Exception as exc:
logger.warning("Telemetry loop error: %s", exc)
log.warning("telemetry_loop_error", error=str(exc))
await asyncio.sleep(interval)
def _send_telemetry(self) -> None:
@@ -437,7 +439,7 @@ class MAVLinkBridge:
else:
self._conn.named_value_float_send(time=t_boot_ms, name=name, value=float(value))
except Exception as exc:
logger.debug("Telemetry send error: %s", exc)
logger.debug("telemetry_send_error", error=str(exc))
# ------------------------------------------------------------------
# Connection factory
@@ -448,8 +450,8 @@ class MAVLinkBridge:
if _PYMAVLINK_AVAILABLE:
try:
conn = _mavutil.mavlink_connection(self.connection_string)
logger.info("MAVLink connection opened: %s", self.connection_string)
logger.info("mavlink_connection_opened", conn=self.connection_string)
return conn
except Exception as exc:
logger.warning("Cannot open MAVLink connection (%s) — using mock", exc)
logger.warning("mavlink_connection_failed", error=str(exc), fallback="mock")
return MockMAVConnection()
@@ -1,18 +1,20 @@
"""Local-disk tile loader (SAT-01/02). Phase 1 home of the existing SatelliteDataManager impl."""
import hashlib
import logging
import math
import os
from concurrent.futures import ThreadPoolExecutor
import cv2
import numpy as np
import structlog
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.satellite import TileBounds, TileCoords
from gps_denied.utils import mercator
logger = structlog.get_logger(__name__)
class SatelliteDataManager:
"""Manages satellite tiles from a local pre-loaded directory.
@@ -24,8 +26,6 @@ class SatelliteDataManager:
downloads and stores tiles before the mission.
"""
_logger = logging.getLogger(__name__)
def __init__(
self,
tile_dir: str = ".satellite_tiles",
@@ -73,8 +73,10 @@ class SatelliteDataManager:
sha.update(chunk)
actual = sha.hexdigest()
if actual != expected:
self._logger.warning("Tile integrity failed: %s (exp %s, got %s)",
rel_path, expected[:12], actual[:12])
logger.warning("tile_integrity_failed",
rel_path=rel_path,
expected_prefix=expected[:12],
actual_prefix=actual[:12])
return False
return True
@@ -4,11 +4,11 @@ SAT-03: GSD normalization — downsample camera frame to satellite resolution.
SAT-04: RANSAC homography → WGS84 position; confidence = inlier_ratio.
"""
import logging
from typing import List, Optional, Tuple
import cv2
import numpy as np
import structlog
from gps_denied.components.satellite_matcher.protocol import IMetricRefinement
from gps_denied.core.models import IModelManager
@@ -16,7 +16,7 @@ from gps_denied.schemas import GPSPoint
from gps_denied.schemas.metric import AlignmentResult, ChunkAlignmentResult, Sim3Transform
from gps_denied.schemas.satellite import TileBounds
logger = logging.getLogger(__name__)
logger = structlog.get_logger(__name__)
class MetricRefinement(IMetricRefinement):
@@ -19,17 +19,17 @@ hardware; Inertial mode is retained for sprint-1 reversibility.
"""
from __future__ import annotations
import logging
from typing import Optional
import numpy as np
import structlog
from gps_denied.components.vio.orbslam_backend import ORBVisualOdometry
from gps_denied.components.vio.protocol import ISequentialVisualOdometry
from gps_denied.schemas import CameraParameters
from gps_denied.schemas.vo import Features, Matches, Motion, RelativePose
logger = logging.getLogger(__name__)
logger = structlog.get_logger(__name__)
# ---------------------------------------------------------------------------
# Optional cuVSLAM SDK import (aarch64 Jetson only — x86 dev/CI must still pass)
@@ -70,9 +70,9 @@ class CuVSLAMVisualOdometry(ISequentialVisualOdometry):
import cuvslam # type: ignore # only available on Jetson
self._cuvslam = cuvslam
self._init_tracker()
logger.info("CuVSLAMVisualOdometry: cuVSLAM SDK loaded (Jetson mode)")
logger.info("cuvslam_sdk_loaded", backend="CuVSLAMVisualOdometry", mode="jetson")
except ImportError:
logger.info("CuVSLAMVisualOdometry: cuVSLAM not available — using ORB fallback (dev/CI mode)")
logger.info("cuvslam_sdk_unavailable", backend="CuVSLAMVisualOdometry", fallback="ORB")
def _init_tracker(self):
"""Initialise cuVSLAM tracker in Inertial mode."""
@@ -96,9 +96,9 @@ class CuVSLAMVisualOdometry(ISequentialVisualOdometry):
gyro_noise=self._imu_params.get("gyro_noise", 0.001),
)
self._tracker = self._cuvslam.Tracker(rig_params, tracker_params)
logger.info("cuVSLAM tracker initialised in Inertial mode")
logger.info("cuvslam_tracker_initialised", mode="inertial")
except Exception as exc:
logger.error("cuVSLAM tracker init failed: %s", exc)
logger.error("cuvslam_tracker_init_failed", error=str(exc))
self._cuvslam = None
@property
@@ -156,7 +156,7 @@ class CuVSLAMVisualOdometry(ISequentialVisualOdometry):
scale_ambiguous=False, # VO-04: cuVSLAM Inertial mode = metric NED
)
except Exception as exc:
logger.error("cuVSLAM tracking step failed: %s", exc)
logger.error("cuvslam_tracking_step_failed", error=str(exc))
return None
@@ -205,9 +205,9 @@ class CuVSLAMMonoDepthVisualOdometry(ISequentialVisualOdometry):
import cuvslam # type: ignore
self._cuvslam = cuvslam
self._init_tracker()
logger.info("CuVSLAMMonoDepthVisualOdometry: cuVSLAM SDK loaded (Jetson Mono-Depth mode)")
logger.info("cuvslam_sdk_loaded", backend="CuVSLAMMonoDepthVisualOdometry", mode="mono_depth")
except ImportError:
logger.info("CuVSLAMMonoDepthVisualOdometry: cuVSLAM not available — using scaled ORB fallback")
logger.info("cuvslam_sdk_unavailable", backend="CuVSLAMMonoDepthVisualOdometry", fallback="scaled_ORB")
def update_depth_hint(self, depth_hint_m: float) -> None:
"""Update barometric altitude used for scale recovery. Call each frame."""
@@ -231,11 +231,11 @@ class CuVSLAMMonoDepthVisualOdometry(ISequentialVisualOdometry):
tracker_params.use_imu = False
tracker_params.odometry_mode = self._cuvslam.OdometryMode.MONO_DEPTH
self._tracker = self._cuvslam.Tracker(rig_params, tracker_params)
logger.info("cuVSLAM tracker initialised in Mono-Depth mode")
logger.info("cuvslam_tracker_initialised", mode="mono_depth")
except Exception:
logger.exception(
"cuVSLAM Mono-Depth tracker init FAILED — falling back to ORB. "
"Production Jetson path is DISABLED until this is fixed."
"cuvslam_mono_depth_init_failed",
note="Production Jetson path is DISABLED until this is fixed.",
)
self._cuvslam = None
@@ -277,7 +277,7 @@ class CuVSLAMMonoDepthVisualOdometry(ISequentialVisualOdometry):
scale_ambiguous=False,
)
except Exception:
logger.exception("cuVSLAM Mono-Depth tracking step failed — frame dropped")
logger.exception("cuvslam_mono_depth_tracking_failed", note="frame dropped")
return None
def _compute_via_orb_scaled(
@@ -14,18 +14,18 @@ optional-import block isolated.
"""
from __future__ import annotations
import logging
from typing import Optional
import cv2
import numpy as np
import structlog
from gps_denied.components.vio.protocol import ISequentialVisualOdometry
from gps_denied.core.models import IModelManager
from gps_denied.schemas import CameraParameters
from gps_denied.schemas.vo import Features, Matches, Motion, RelativePose
logger = logging.getLogger(__name__)
logger = structlog.get_logger(__name__)
class SequentialVisualOdometry(ISequentialVisualOdometry):
@@ -88,7 +88,7 @@ class SequentialVisualOdometry(ISequentialVisualOdometry):
pts1, pts2, cameraMatrix=K, method=cv2.RANSAC, prob=0.999, threshold=1.0
)
except Exception as e:
logger.error(f"Error finding essential matrix: {e}")
logger.error("essential_matrix_failed", error=str(e))
return None
if E is None or E.shape != (3, 3):
@@ -98,14 +98,14 @@ class SequentialVisualOdometry(ISequentialVisualOdometry):
inlier_count = np.sum(inliers_mask)
if inlier_count < inlier_threshold:
logger.warning(f"Insufficient inliers: {inlier_count} < {inlier_threshold}")
logger.warning("insufficient_inliers", n_inliers=int(inlier_count), threshold=inlier_threshold)
return None
# Recover pose
try:
_, R, t, mask = cv2.recoverPose(E, pts1, pts2, cameraMatrix=K, mask=inliers)
except Exception as e:
logger.error(f"Error recovering pose: {e}")
logger.error("recover_pose_failed", error=str(e))
return None
return Motion(
@@ -224,7 +224,7 @@ class ORBVisualOdometry(ISequentialVisualOdometry):
try:
E, inliers = cv2.findEssentialMat(pts1, pts2, cameraMatrix=K, method=cv2.RANSAC, prob=0.999, threshold=1.0)
except Exception as exc:
logger.warning("ORB findEssentialMat failed: %s", exc)
logger.warning("orb_essential_matrix_failed", error=str(exc))
return None
if E is None or E.shape != (3, 3) or inliers is None:
return None
@@ -235,7 +235,7 @@ class ORBVisualOdometry(ISequentialVisualOdometry):
try:
_, R, t, mask = cv2.recoverPose(E, pts1, pts2, cameraMatrix=K, mask=inliers)
except Exception as exc:
logger.warning("ORB recoverPose failed: %s", exc)
logger.warning("orb_recover_pose_failed", error=str(exc))
return None
return Motion(translation=t.flatten(), rotation=R, inliers=inlier_mask, inlier_count=inlier_count)
+5 -4
View File
@@ -1,14 +1,15 @@
"""Route Chunk Manager (Component F12)."""
import logging
import uuid
from typing import Dict, List, Optional, Protocol, runtime_checkable
import structlog
from gps_denied.core.graph import IFactorGraphOptimizer
from gps_denied.schemas.chunk import ChunkHandle, ChunkStatus
from gps_denied.schemas.metric import Sim3Transform
logger = logging.getLogger(__name__)
logger = structlog.get_logger(__name__)
@runtime_checkable
@@ -67,7 +68,7 @@ class RouteChunkManager(IRouteChunkManager):
)
self._chunks[flight_id][chunk_id] = handle
logger.info(f"Created new chunk {chunk_id} starting at frame {start_frame_id}")
logger.info("chunk_created", chunk_id=chunk_id, start_frame_id=start_frame_id)
return handle
def get_active_chunk(self, flight_id: str) -> Optional[ChunkHandle]:
@@ -123,7 +124,7 @@ class RouteChunkManager(IRouteChunkManager):
new_chunk.matching_status = ChunkStatus.MERGED
new_chunk.is_active = False
logger.info(f"Merged chunk {new_chunk_id} into {main_chunk_id}")
logger.info("chunks_merged", new_chunk_id=new_chunk_id, main_chunk_id=main_chunk_id)
return True
return False
+6 -5
View File
@@ -2,10 +2,10 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import numpy as np
import structlog
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.eskf import (
@@ -18,7 +18,7 @@ from gps_denied.schemas.eskf import (
if TYPE_CHECKING:
from gps_denied.core.coordinates import CoordinateTransformer
logger = logging.getLogger(__name__)
logger = structlog.get_logger(__name__)
# ---------------------------------------------------------------------------
@@ -171,7 +171,7 @@ class ESKF:
return
dt = imu.timestamp - self._last_timestamp
if dt <= 0 or dt > 1.0:
logger.debug("Skipping IMU prediction: dt=%.4f", dt)
logger.debug("imu_prediction_skipped", dt=round(dt, 4))
return
cfg = self.config
@@ -279,8 +279,9 @@ class ESKF:
# Mahalanobis outlier gate
mahal_sq = float(z @ S_inv @ z)
if mahal_sq > self.config.mahalanobis_threshold:
logger.warning("Satellite outlier rejected: Mahalanobis² %.1f > %.1f",
mahal_sq, self.config.mahalanobis_threshold)
logger.warning("satellite_outlier_rejected",
mahalanobis_sq=round(mahal_sq, 1),
threshold=self.config.mahalanobis_threshold)
return z
K = self._P @ H_sat.T @ S_inv
+5 -5
View File
@@ -1,10 +1,10 @@
"""Factor Graph Optimizer (Component F10)."""
import logging
from datetime import datetime, timezone
from typing import Dict, Protocol, runtime_checkable
import numpy as np
import structlog
try:
import gtsam
@@ -17,7 +17,7 @@ from gps_denied.schemas.graph import FactorGraphConfig, OptimizationResult, Pose
from gps_denied.schemas.metric import Sim3Transform
from gps_denied.schemas.vo import RelativePose
logger = logging.getLogger(__name__)
logger = structlog.get_logger(__name__)
@runtime_checkable
@@ -142,7 +142,7 @@ class FactorGraphOptimizer(IFactorGraphOptimizer):
new_t = gtsam.Point3(float(t[0]), float(t[1]), float(t[2]))
state["initial"].insert(key_j, gtsam.Pose3(gtsam.Rot3(), new_t))
except Exception as exc:
logger.debug("GTSAM add_relative_factor failed: %s", exc)
logger.debug("gtsam_add_relative_factor_failed", error=str(exc))
return True
@@ -181,7 +181,7 @@ class FactorGraphOptimizer(IFactorGraphOptimizer):
if not state["initial"].exists(key):
state["initial"].insert(key, prior)
except Exception as exc:
logger.debug("GTSAM add_absolute_factor failed: %s", exc)
logger.debug("gtsam_add_absolute_factor_failed", error=str(exc))
return True
@@ -219,7 +219,7 @@ class FactorGraphOptimizer(IFactorGraphOptimizer):
state["graph"] = gtsam.NonlinearFactorGraph()
state["initial"] = gtsam.Values()
except Exception as exc:
logger.warning("GTSAM ISAM2 update failed: %s", exc)
logger.warning("gtsam_isam2_update_failed", error=str(exc))
state["dirty"] = False
return OptimizationResult(
+3 -3
View File
@@ -1,16 +1,16 @@
"""Failure Recovery Coordinator (Component F11)."""
import logging
from typing import List, Protocol, runtime_checkable
import numpy as np
import structlog
from gps_denied.core.chunk_manager import IRouteChunkManager
from gps_denied.core.gpr import IGlobalPlaceRecognition
from gps_denied.core.metric import IMetricRefinement
from gps_denied.schemas.chunk import ChunkStatus
logger = logging.getLogger(__name__)
logger = structlog.get_logger(__name__)
@runtime_checkable
@@ -37,7 +37,7 @@ class FailureRecoveryCoordinator(IFailureRecoveryCoordinator):
def handle_tracking_lost(self, flight_id: str, current_frame_id: int) -> bool:
"""Called when F07 fails to find sequential matches."""
logger.warning(f"Tracking lost for flight {flight_id} at frame {current_frame_id}")
logger.warning("tracking_lost", flight_id=flight_id, current_frame_id=current_frame_id)
# Create a new active chunk to record new relative frames independently
self.chunk_manager.create_new_chunk(flight_id, current_frame_id)
+67
View File
@@ -0,0 +1,67 @@
"""structlog configuration. Call ``configure_logging`` once at app boot.
Per Phase 2 / OBS-01: hot path uses structlog with ``correlation_id`` (= frame_id)
bound at frame entry. Non-hot-path code keeps stdlib ``logging`` until Phase 6
(api/, db/, scripts/, composition.py at startup, core/models.py engine load).
The stdlib bridge below lets stdlib records flow through the same renderer.
"""
from __future__ import annotations
import logging
from typing import Literal
import orjson
import structlog
_Env = Literal["jetson", "x86_dev", "ci", "sitl"]
_configured = False
def configure_logging(env: _Env, level: int = logging.INFO) -> None:
"""Configure structlog ONCE at app boot. Idempotent — repeat calls no-op.
Args:
env: Deployment environment. Controls renderer:
- ``x86_dev`` -> pretty console renderer
- ``jetson|ci|sitl`` -> JSON renderer (orjson) + bytes logger factory
level: Stdlib log level threshold. ``filtering_bound_logger`` short-circuits
sub-level calls in ~50-100 ns. Keep at INFO in production.
"""
global _configured
if _configured:
return
shared_processors: list = [
# MUST be first — pulls bound frame_id into every record.
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.format_exc_info,
]
if env == "x86_dev":
processors = [*shared_processors, structlog.dev.ConsoleRenderer()]
logger_factory = structlog.PrintLoggerFactory()
else:
# jetson | ci | sitl — JSON to bytes via orjson; fastest path on hot loop.
processors = [
*shared_processors,
structlog.processors.JSONRenderer(serializer=orjson.dumps),
]
logger_factory = structlog.BytesLoggerFactory()
structlog.configure(
processors=processors,
wrapper_class=structlog.make_filtering_bound_logger(level),
logger_factory=logger_factory,
cache_logger_on_first_use=True,
)
# Bridge stdlib logging: api/, db/, scripts/, composition.py, core/models.py
# (Phase 6 ports these to structlog directly). Until then, their records share
# the same level threshold; format passthrough is via "%(message)s" because the
# structlog renderer above is the actual output sink.
logging.basicConfig(level=level, format="%(message)s")
_configured = True
+6
View File
@@ -17,6 +17,8 @@ from __future__ import annotations
import logging
from typing import Optional
from gps_denied.obs import configure_logging
logger = logging.getLogger(__name__)
@@ -49,6 +51,10 @@ def build_pipeline(
FlightProcessor
Fully wired processor with all components attached.
"""
# OBS-01: configure structlog idempotently so tests / scripts calling
# build_pipeline directly (without app.py lifespan) still get logging configured.
configure_logging(env=env)
# Lazy imports to avoid circular import chains at module load time.
from gps_denied.components.gpr.faiss_gpr import GlobalPlaceRecognition
from gps_denied.components.mavlink_io.pymavlink_bridge import MAVLinkBridge
+38 -16
View File
@@ -7,12 +7,13 @@ State Machine: NORMAL → LOST → RECOVERY → NORMAL.
from __future__ import annotations
import asyncio
import logging
import time
from enum import Enum
from typing import Optional
import numpy as np
import structlog
from structlog.contextvars import bind_contextvars, clear_contextvars
from gps_denied.core.eskf import ESKF
from gps_denied.pipeline.image_input import ImageInputPipeline
@@ -36,7 +37,7 @@ from gps_denied.schemas.flight import (
Waypoint,
)
logger = logging.getLogger(__name__)
logger = structlog.get_logger(__name__)
# ---------------------------------------------------------------------------
@@ -178,6 +179,23 @@ class FlightProcessor:
PIPE-05: ImageRotationManager initialised on first frame
PIPE-07: ESKF confidence → MAVLink fix_type via bridge.update_state
"""
# OBS-01: bind correlation_id=frame_id for the duration of this frame.
# All hot-path log calls (in this file + the 9 component files) auto-include
# frame_id and flight_id via structlog.contextvars.merge_contextvars.
clear_contextvars()
bind_contextvars(correlation_id=frame_id, flight_id=flight_id)
try:
return await self._process_frame_inner(flight_id, frame_id, image)
finally:
clear_contextvars()
async def _process_frame_inner(
self,
flight_id: str,
frame_id: int,
image,
):
"""Inner frame processing — called with correlation_id already bound."""
result = FrameResult(frame_id)
state = self._flight_states.get(flight_id, TrackingState.NORMAL)
eskf = self._eskf.get(flight_id)
@@ -187,6 +205,8 @@ class FlightProcessor:
resolution_width=640, resolution_height=480,
)
logger.info("frame_started", state=state.value)
# ---- PIPE-05: Initialise heading tracking on first frame ----
if self._rotation and frame_id == 0:
self._rotation.requires_rotation_sweep(flight_id) # seeds HeadingHistory
@@ -214,7 +234,7 @@ class FlightProcessor:
dt_vo = max(0.01, now - (eskf.last_timestamp or now))
eskf.update_vo(rel_pose.translation, dt_vo)
except Exception as exc:
logger.warning("VO failed for frame %d: %s", frame_id, exc)
logger.warning("vo_failed", error=str(exc))
# Store current image for next frame
self._prev_images[flight_id] = image
@@ -229,7 +249,7 @@ class FlightProcessor:
if state == TrackingState.NORMAL:
if not vo_ok and frame_id > 0:
state = TrackingState.LOST
logger.info("Flight %s → LOST at frame %d", flight_id, frame_id)
logger.info("flight_state_change", new_state="LOST")
if self._recovery:
self._recovery.handle_tracking_lost(flight_id, frame_id)
@@ -249,7 +269,7 @@ class FlightProcessor:
result.alignment_success = True
# PIPE-04: Reset failure count on successful recovery
self._failure_counts[flight_id] = 0
logger.info("Flight %s recovered → NORMAL at frame %d", flight_id, frame_id)
logger.info("flight_state_change", new_state="NORMAL", source="recovery")
# ---- 3. Satellite position fix (PIPE-01/02) ----
if state == TrackingState.NORMAL and self._metric:
@@ -274,7 +294,7 @@ class FlightProcessor:
if tile_result:
sat_tile, tile_bounds = tile_result
except Exception as exc:
logger.debug("Satellite tile fetch failed: %s", exc)
logger.debug("satellite_tile_fetch_failed", error=str(exc))
# Fallback: GPR candidate tile (mock image, real bounds)
if sat_tile is None and self._gpr:
@@ -284,7 +304,7 @@ class FlightProcessor:
sat_tile = np.zeros((256, 256, 3), dtype=np.uint8)
tile_bounds = candidates[0].bounds
except Exception as exc:
logger.debug("GPR tile fallback failed: %s", exc)
logger.debug("gpr_tile_fallback_failed", error=str(exc))
if sat_tile is not None and tile_bounds is not None:
try:
@@ -310,16 +330,17 @@ class FlightProcessor:
noise_m = 5.0 + 15.0 * (1.0 - float(align.confidence))
eskf.update_satellite(pos_enu, noise_m)
except Exception as exc:
logger.debug("ESKF satellite update failed: %s", exc)
logger.debug("eskf_satellite_update_failed", error=str(exc))
except Exception as exc:
logger.warning("Metric alignment failed at frame %d: %s", frame_id, exc)
logger.warning("metric_alignment_failed", error=str(exc))
# ---- 4. Graph optimization (incremental) ----
if self._graph:
opt_result = self._graph.optimize(flight_id, iterations=5)
logger.debug(
"Optimization: converged=%s, error=%.4f",
opt_result.converged, opt_result.final_error,
"graph_optimized",
converged=opt_result.converged,
error=round(opt_result.final_error, 4),
)
# ---- PIPE-07: Push ESKF state → MAVLink GPS_INPUT ----
@@ -329,11 +350,12 @@ class FlightProcessor:
alt = self._altitudes.get(flight_id, 100.0)
self._mavlink.update_state(eskf_state, altitude_m=alt)
except Exception as exc:
logger.debug("MAVLink state push failed: %s", exc)
logger.debug("mavlink_state_push_failed", error=str(exc))
# ---- 5. Publish via SSE ----
result.tracking_state = state
self._flight_states[flight_id] = state
logger.info("frame_complete", tracking_state=state.value, alignment=result.alignment_success)
await self._publish_frame_result(flight_id, result)
return result
@@ -393,7 +415,7 @@ class FlightProcessor:
try:
asyncio.create_task(self._mavlink.start(req.start_gps))
except Exception as exc:
logger.warning("MAVLink bridge start failed: %s", exc)
logger.warning("mavlink_bridge_start_failed", error=str(exc))
return FlightResponse(
flight_id=flight.id,
@@ -532,9 +554,9 @@ class FlightProcessor:
alt = self._altitudes.get(flight_id, 100.0)
eskf.update_satellite(np.array([e, n, alt]), noise_meters=500.0)
self._failure_counts[flight_id] = 0
logger.info("User fix applied for %s: %s", flight_id, req.satellite_gps)
logger.info("user_fix_applied", flight_id=flight_id, gps=str(req.satellite_gps))
except Exception as exc:
logger.warning("User fix ESKF injection failed: %s", exc)
logger.warning("user_fix_eskf_failed", error=str(exc))
return UserFixResponse(
accepted=True, processing_resumed=True, message="Fix applied."
@@ -580,7 +602,7 @@ class FlightProcessor:
quaternion=quat,
)
except Exception as exc:
logger.debug("pixel_to_gps failed: %s", exc)
logger.debug("pixel_to_gps_failed", error=str(exc))
# Fallback: return ESKF position projected to ground (no pixel shift)
if gps is None and eskf:
+90
View File
@@ -9,6 +9,96 @@ from gps_denied.core.coordinates import CoordinateTransformer
from gps_denied.core.models import ModelManager
from gps_denied.schemas import CameraParameters, GPSPoint
# ---------------------------------------------------------------
# Phase 2 / TEST-03: AC traceability plugin
#
# Registers categorical markers (defensive — pyproject.toml is primary), validates
# @pytest.mark.ac() arguments against the canonical AC-ID regex at collection time,
# and (when --ac-dump=<path> is supplied) writes a {ac_id: [test_nodeid, ...]} JSON
# at session end for `scripts/gen_ac_traceability.py` to consume.
#
# See RESEARCH.md §2.1 for the canonical implementation and rationale.
# ---------------------------------------------------------------
import json
import re
from collections import defaultdict
from pathlib import Path
_AC_ID_RE = re.compile(r"^AC-(?:\d+\.\d+[a-z]?|NEW-\d+)$")
def pytest_configure(config):
"""Defensive marker registration. Primary registration lives in pyproject.toml,
but doing it here too means a future maintainer who drops the pyproject markers
list does not silently break --strict-markers."""
for line in (
"unit: pure-math or single-class test; no I/O",
"integration: cross-subsystem test; in-memory SQLite / ASGI / full wiring",
"blackbox: validates external contract without a live producer",
"sitl: requires ARDUPILOT_SITL_HOST — nightly only",
"e2e: full-pipeline run against a real dataset — nightly only",
"ac(ac_id): link test to one or more Acceptance Criteria (e.g. AC-1.1, AC-NEW-3)",
):
config.addinivalue_line("markers", line)
def pytest_addoption(parser):
parser.addoption(
"--ac-dump",
action="store",
default=None,
help="Path to write the {ac_id: [test_nodeid, ...]} JSON at session end. "
"Consumed by scripts/gen_ac_traceability.py.",
)
def pytest_collection_modifyitems(config, items):
"""Validate @pytest.mark.ac(...) arguments against the canonical AC-ID regex.
Fail collection (rather than emit a runtime error) so AC-ID typos surface immediately.
The traceability script's --check mode catches forward orphans (AC without test);
this hook catches backward orphans (test references non-existent AC) by enforcing
the syntactic AC-ID shape. Semantic existence (AC ID is declared in the AC doc) is
enforced by the script in Plan 02-04.
"""
errors = []
for item in items:
for mark in item.iter_markers(name="ac"):
if not mark.args:
errors.append(
f"{item.nodeid}: @pytest.mark.ac() requires at least one AC ID arg"
)
continue
for arg in mark.args:
if not isinstance(arg, str) or not _AC_ID_RE.match(arg):
errors.append(
f"{item.nodeid}: @pytest.mark.ac({arg!r}) — must match "
f"AC-X.Y or AC-NEW-N (e.g. 'AC-1.1', 'AC-NEW-3')"
)
if errors:
raise pytest.UsageError(
"AC marker validation failed:\n " + "\n ".join(errors)
)
def pytest_sessionfinish(session, exitstatus):
"""Dump {ac_id: [test_nodeid, ...]} to --ac-dump path if supplied.
Runs even when pytest is invoked with --collect-only (session.items is populated
before tests execute), so scripts/gen_ac_traceability.py can dump in ~seconds on a
full suite.
"""
dump_path = session.config.getoption("--ac-dump", default=None)
if not dump_path:
return
mapping: dict[str, list[str]] = defaultdict(list)
for item in session.items:
for mark in item.iter_markers(name="ac"):
for ac_id in mark.args:
mapping[ac_id].append(item.nodeid)
Path(dump_path).write_text(json.dumps(dict(sorted(mapping.items())), indent=2))
# ---------------------------------------------------------------
# Common constants
# ---------------------------------------------------------------