refactor(01-07): factor_graph, pipeline pkg, testing/benchmark, Protocol ABCs

- Create core/factor_graph.py: IFactorGraphOptimizer converted to Protocol
- Shim core/graph.py to re-export from core/factor_graph
- Create pipeline/ package: orchestrator, image_input, result_manager, sse_streamer
- Shim core/{processor,pipeline,results,sse}.py to re-export from pipeline/
- Create testing/benchmark.py; shim core/benchmark.py
- Convert IRouteChunkManager, IFailureRecoveryCoordinator, IModelManager, IImageMatcher to Protocol
- Update pyproject.toml ruff per-file-ignores to new paths
- All 216 tests pass (regression floor maintained)
This commit is contained in:
Yuzviak
2026-05-11 08:59:07 +03:00
parent 275c7b4642
commit 5a60c1ee2c
18 changed files with 1857 additions and 1836 deletions
+8 -371
View File
@@ -1,371 +1,8 @@
"""Accuracy Benchmark (Phase 7).
Provides:
- SyntheticTrajectory — generates a realistic fixed-wing UAV flight path
with ground-truth GPS + noisy sensor data.
- AccuracyBenchmark — replays a trajectory through the ESKF pipeline
and computes position-error statistics.
Acceptance criteria (from solution.md):
AC-PERF-1: 80 % of frames within 50 m of ground truth.
AC-PERF-2: 60 % of frames within 20 m of ground truth.
AC-PERF-3: End-to-end per-frame latency < 400 ms.
AC-PERF-4: VO drift over 1 km straight segment (no sat correction) < 100 m.
"""
from __future__ import annotations
import math
import time
from dataclasses import dataclass, field
from typing import Callable, Optional
import numpy as np
from gps_denied.core.coordinates import CoordinateTransformer
from gps_denied.core.eskf import ESKF
from gps_denied.schemas import GPSPoint
from gps_denied.schemas.eskf import ESKFConfig, IMUMeasurement
# ---------------------------------------------------------------------------
# Synthetic trajectory
# ---------------------------------------------------------------------------
@dataclass
class TrajectoryFrame:
"""One simulated camera frame with ground-truth and noisy sensor data."""
frame_id: int
timestamp: float
true_position_enu: np.ndarray # (3,) East, North, Up in metres
true_gps: GPSPoint # WGS84 from true ENU
imu_measurements: list[IMUMeasurement] # High-rate IMU between frames
vo_translation: Optional[np.ndarray] # Noisy relative displacement (3,)
vo_tracking_good: bool = True
@dataclass
class SyntheticTrajectoryConfig:
"""Parameters for trajectory generation."""
# Origin (mission start)
origin: GPSPoint = field(default_factory=lambda: GPSPoint(lat=49.0, lon=32.0))
altitude_m: float = 600.0 # Constant AGL altitude (m)
# UAV speed and heading
speed_mps: float = 20.0 # ~70 km/h (typical fixed-wing)
heading_deg: float = 45.0 # Initial heading (degrees CW from North)
camera_fps: float = 0.7 # ADTI 20L V1 camera rate (Hz)
imu_hz: float = 200.0 # IMU sample rate
num_frames: int = 50 # Number of camera frames to simulate
# Noise parameters
vo_noise_m: float = 0.5 # VO translation noise (sigma, metres)
imu_accel_noise: float = 0.01 # Accelerometer noise sigma (m/s²)
imu_gyro_noise: float = 0.001 # Gyroscope noise sigma (rad/s)
# Failure injection
vo_failure_frames: list[int] = field(default_factory=list)
# Waypoints for heading changes (ENU East, North metres from origin)
waypoints_enu: list[tuple[float, float]] = field(default_factory=list)
class SyntheticTrajectory:
"""Generate a synthetic fixed-wing UAV flight with ground truth + noisy sensors."""
def __init__(self, config: SyntheticTrajectoryConfig | None = None):
self.config = config or SyntheticTrajectoryConfig()
self._coord = CoordinateTransformer()
self._flight_id = "__synthetic__"
self._coord.set_enu_origin(self._flight_id, self.config.origin)
def generate(self) -> list[TrajectoryFrame]:
"""Generate all trajectory frames."""
cfg = self.config
dt_camera = 1.0 / cfg.camera_fps
dt_imu = 1.0 / cfg.imu_hz
imu_steps = int(dt_camera * cfg.imu_hz)
frames: list[TrajectoryFrame] = []
pos = np.array([0.0, 0.0, cfg.altitude_m])
vel = self._heading_to_enu_vel(cfg.heading_deg, cfg.speed_mps)
prev_pos = pos.copy()
t = time.time()
waypoints = list(cfg.waypoints_enu) # copy
for fid in range(cfg.num_frames):
# --- Waypoint steering ---
if waypoints:
wp_e, wp_n = waypoints[0]
to_wp = np.array([wp_e - pos[0], wp_n - pos[1], 0.0])
dist_wp = np.linalg.norm(to_wp[:2])
if dist_wp < cfg.speed_mps * dt_camera:
waypoints.pop(0)
else:
heading_rad = math.atan2(to_wp[0], to_wp[1]) # ENU: E=X, N=Y
vel = np.array([
cfg.speed_mps * math.sin(heading_rad),
cfg.speed_mps * math.cos(heading_rad),
0.0,
])
# --- Simulate IMU between frames ---
imu_list: list[IMUMeasurement] = []
for step in range(imu_steps):
ts = t + step * dt_imu
# Body-frame acceleration (mostly gravity correction, small forward accel)
accel_true = np.array([0.0, 0.0, 9.81]) # gravity compensation
gyro_true = np.zeros(3)
imu = IMUMeasurement(
accel=accel_true + np.random.randn(3) * cfg.imu_accel_noise,
gyro=gyro_true + np.random.randn(3) * cfg.imu_gyro_noise,
timestamp=ts,
)
imu_list.append(imu)
# --- Propagate position ---
prev_pos = pos.copy()
pos = pos + vel * dt_camera
t += dt_camera
# --- True GPS from ENU position ---
true_gps = self._coord.enu_to_gps(
self._flight_id, (float(pos[0]), float(pos[1]), float(pos[2]))
)
# --- VO measurement (relative displacement + noise) ---
true_displacement = pos - prev_pos
vo_tracking_good = fid not in cfg.vo_failure_frames
if vo_tracking_good:
noisy_displacement = true_displacement + np.random.randn(3) * cfg.vo_noise_m
noisy_displacement[2] = 0.0 # monocular VO is scale-ambiguous in Z
else:
noisy_displacement = None
frames.append(TrajectoryFrame(
frame_id=fid,
timestamp=t,
true_position_enu=pos.copy(),
true_gps=true_gps,
imu_measurements=imu_list,
vo_translation=noisy_displacement,
vo_tracking_good=vo_tracking_good,
))
return frames
@staticmethod
def _heading_to_enu_vel(heading_deg: float, speed_mps: float) -> np.ndarray:
"""Convert heading (degrees CW from North) to ENU velocity vector."""
rad = math.radians(heading_deg)
return np.array([
speed_mps * math.sin(rad), # East
speed_mps * math.cos(rad), # North
0.0, # Up
])
# ---------------------------------------------------------------------------
# Accuracy Benchmark
# ---------------------------------------------------------------------------
@dataclass
class BenchmarkResult:
"""Position error statistics over a trajectory replay."""
errors_m: list[float] # Per-frame horizontal error in metres
latencies_ms: list[float] # Per-frame process time in ms
frames_total: int
frames_with_good_estimate: int
@property
def p80_error_m(self) -> float:
"""80th percentile position error (metres)."""
return float(np.percentile(self.errors_m, 80)) if self.errors_m else float("inf")
@property
def p60_error_m(self) -> float:
"""60th percentile position error (metres)."""
return float(np.percentile(self.errors_m, 60)) if self.errors_m else float("inf")
@property
def median_error_m(self) -> float:
"""Median position error (metres)."""
return float(np.median(self.errors_m)) if self.errors_m else float("inf")
@property
def max_error_m(self) -> float:
return float(max(self.errors_m)) if self.errors_m else float("inf")
@property
def p95_latency_ms(self) -> float:
"""95th percentile frame latency (ms)."""
return float(np.percentile(self.latencies_ms, 95)) if self.latencies_ms else float("inf")
@property
def pct_within_50m(self) -> float:
"""Fraction of frames within 50 m error."""
if not self.errors_m:
return 0.0
return sum(e <= 50.0 for e in self.errors_m) / len(self.errors_m)
@property
def pct_within_20m(self) -> float:
"""Fraction of frames within 20 m error."""
if not self.errors_m:
return 0.0
return sum(e <= 20.0 for e in self.errors_m) / len(self.errors_m)
def passes_acceptance_criteria(self) -> tuple[bool, dict[str, bool]]:
"""Check all solution.md acceptance criteria.
Returns (overall_pass, per_criterion_dict).
"""
checks = {
"AC-PERF-1: 80% within 50m": self.pct_within_50m >= 0.80,
"AC-PERF-2: 60% within 20m": self.pct_within_20m >= 0.60,
"AC-PERF-3: p95 latency < 400ms": self.p95_latency_ms < 400.0,
}
overall = all(checks.values())
return overall, checks
def summary(self) -> str:
overall, checks = self.passes_acceptance_criteria()
lines = [
f"Frames: {self.frames_total} | with estimate: {self.frames_with_good_estimate}",
f"Error — median: {self.median_error_m:.1f}m p80: {self.p80_error_m:.1f}m "
f"p60: {self.p60_error_m:.1f}m max: {self.max_error_m:.1f}m",
f"Within 50m: {self.pct_within_50m*100:.1f}% | within 20m: {self.pct_within_20m*100:.1f}%",
f"Latency p95: {self.p95_latency_ms:.1f}ms",
"",
"Acceptance criteria:",
]
for criterion, passed in checks.items():
lines.append(f" {'PASS' if passed else 'FAIL'} {criterion}")
lines.append(f"\nOverall: {'PASS' if overall else 'FAIL'}")
return "\n".join(lines)
class AccuracyBenchmark:
"""Replays a SyntheticTrajectory through the ESKF and measures accuracy.
The benchmark uses only the ESKF (no full FlightProcessor) for speed.
Satellite corrections are injected optionally via sat_correction_fn.
"""
def __init__(
self,
eskf_config: ESKFConfig | None = None,
sat_correction_fn: Optional[Callable[[TrajectoryFrame], Optional[np.ndarray]]] = None,
):
"""
Args:
eskf_config: ESKF tuning parameters.
sat_correction_fn: Optional callback(frame) → ENU position or None.
Called on keyframes to inject satellite corrections.
If None, no satellite corrections are applied.
"""
self.eskf_config = eskf_config or ESKFConfig()
self.sat_correction_fn = sat_correction_fn
def run(
self,
trajectory: list[TrajectoryFrame],
origin: GPSPoint,
satellite_keyframe_interval: int = 7,
) -> BenchmarkResult:
"""Replay trajectory frames through ESKF, collect errors and latencies.
Args:
trajectory: List of TrajectoryFrame (from SyntheticTrajectory).
origin: WGS84 reference origin for ENU.
satellite_keyframe_interval: Apply satellite correction every N frames.
"""
coord = CoordinateTransformer()
flight_id = "__benchmark__"
coord.set_enu_origin(flight_id, origin)
eskf = ESKF(self.eskf_config)
# Init at origin with HIGH uncertainty
eskf.initialize(np.array([0.0, 0.0, trajectory[0].true_position_enu[2]]),
trajectory[0].timestamp)
errors_m: list[float] = []
latencies_ms: list[float] = []
frames_with_estimate = 0
for frame in trajectory:
t_frame_start = time.perf_counter()
# --- IMU prediction ---
for imu in frame.imu_measurements:
eskf.predict(imu)
# --- VO update ---
if frame.vo_tracking_good and frame.vo_translation is not None:
dt_vo = 1.0 / 0.7 # camera interval
eskf.update_vo(frame.vo_translation, dt_vo)
# --- Satellite update (keyframes) ---
if frame.frame_id % satellite_keyframe_interval == 0:
sat_pos_enu: Optional[np.ndarray] = None
if self.sat_correction_fn is not None:
sat_pos_enu = self.sat_correction_fn(frame)
else:
# Default: inject ground-truth position + realistic noise
noise_m = 10.0
sat_pos_enu = (
frame.true_position_enu[:3]
+ np.random.randn(3) * noise_m
)
sat_pos_enu[2] = frame.true_position_enu[2] # keep altitude
if sat_pos_enu is not None:
# Tell ESKF the measurement noise matches what we inject
eskf.update_satellite(sat_pos_enu, noise_meters=noise_m)
latency_ms = (time.perf_counter() - t_frame_start) * 1000.0
latencies_ms.append(latency_ms)
# --- Compute horizontal error vs ground truth ---
if eskf.initialized and eskf._nominal_state is not None:
est_pos = eskf._nominal_state["position"]
true_pos = frame.true_position_enu
horiz_error = float(np.linalg.norm(est_pos[:2] - true_pos[:2]))
errors_m.append(horiz_error)
frames_with_estimate += 1
else:
errors_m.append(float("inf"))
return BenchmarkResult(
errors_m=errors_m,
latencies_ms=latencies_ms,
frames_total=len(trajectory),
frames_with_good_estimate=frames_with_estimate,
)
def run_vo_drift_test(
self,
trajectory_length_m: float = 1000.0,
speed_mps: float = 20.0,
) -> float:
"""Measure VO drift over a straight segment with NO satellite correction.
Returns final horizontal position error in metres.
Per solution.md, this should be < 100m over 1km.
"""
fps = 0.7
num_frames = max(10, int(trajectory_length_m / speed_mps * fps))
cfg = SyntheticTrajectoryConfig(
speed_mps=speed_mps,
heading_deg=0.0, # straight North
camera_fps=fps,
num_frames=num_frames,
vo_noise_m=0.3, # cuVSLAM-grade VO noise
)
traj_gen = SyntheticTrajectory(cfg)
frames = traj_gen.generate()
# No satellite corrections
benchmark_no_sat = AccuracyBenchmark(
eskf_config=self.eskf_config,
sat_correction_fn=lambda _: None, # suppress all satellite updates
)
result = benchmark_no_sat.run(frames, cfg.origin, satellite_keyframe_interval=9999)
# Return final-frame error
return result.errors_m[-1] if result.errors_m else float("inf")
"""Legacy import path. Phase 1 shim — code lives in testing/benchmark.py."""
from gps_denied.testing.benchmark import ( # noqa: F401
AccuracyBenchmark,
BenchmarkResult,
SyntheticTrajectory,
SyntheticTrajectoryConfig,
TrajectoryFrame,
)