import os import yaml import logging from typing import Dict, Any, Optional, Tuple, List from pydantic import BaseModel, Field from abc import ABC, abstractmethod from f02_1_flight_lifecycle_manager import CameraParameters logger = logging.getLogger(__name__) # --- Data Models --- class ValidationResult(BaseModel): is_valid: bool errors: List[str] = Field(default_factory=list) class OperationalArea(BaseModel): name: str = "Eastern Ukraine" min_lat: float = 45.0 max_lat: float = 52.0 min_lon: float = 22.0 max_lon: float = 40.0 class ModelPaths(BaseModel): superpoint: str = "models/superpoint.engine" lightglue: str = "models/lightglue.engine" dinov2: str = "models/dinov2.engine" litesam: str = "models/litesam.engine" class DatabaseConfig(BaseModel): url: str = "sqlite:///flights.db" class APIConfig(BaseModel): host: str = "0.0.0.0" port: int = 8000 class SystemConfig(BaseModel): camera: CameraParameters operational_area: OperationalArea = Field(default_factory=OperationalArea) models: ModelPaths = Field(default_factory=ModelPaths) database: DatabaseConfig = Field(default_factory=DatabaseConfig) api: APIConfig = Field(default_factory=APIConfig) class FlightConfig(BaseModel): camera_params: CameraParameters altitude: float operational_area: OperationalArea = Field(default_factory=OperationalArea) # --- Interface --- class IConfigurationManager(ABC): @abstractmethod def load_config(self, config_path: str) -> SystemConfig: pass @abstractmethod def get_camera_params(self, camera_id: Optional[str] = None) -> CameraParameters: pass @abstractmethod def validate_config(self, config: SystemConfig) -> ValidationResult: pass @abstractmethod def get_flight_config(self, flight_id: str) -> FlightConfig: pass @abstractmethod def update_config(self, section: str, key: str, value: Any) -> bool: pass @abstractmethod def get_operational_altitude(self, flight_id: str) -> float: pass @abstractmethod def get_frame_spacing(self, flight_id: str) -> float: pass @abstractmethod def save_flight_config(self, flight_id: str, config: FlightConfig) -> bool: pass # --- Implementation --- class ConfigurationManager(IConfigurationManager): """ F17: Configuration Manager Handles loading, validation, and runtime management of system-wide configuration and individual flight parameters. """ def __init__(self, f03_database=None): self.db = f03_database self._system_config: Optional[SystemConfig] = None self._flight_configs_cache: Dict[str, FlightConfig] = {} self._default_camera = CameraParameters( focal_length_mm=25.0, sensor_width_mm=36.0, resolution={"width": 1920, "height": 1080} ) # --- 17.01 Feature: System Configuration --- def _parse_yaml_file(self, path: str) -> Dict[str, Any]: if not os.path.exists(path): logger.warning(f"Config file {path} not found. Using defaults.") return {} try: with open(path, 'r') as f: data = yaml.safe_load(f) return data if data else {} except yaml.YAMLError as e: raise ValueError(f"Malformed YAML in config file: {e}") def _apply_defaults(self, raw_data: Dict[str, Any]) -> SystemConfig: cam_data = raw_data.get("camera", {}) camera = CameraParameters( focal_length_mm=cam_data.get("focal_length_mm", self._default_camera.focal_length_mm), sensor_width_mm=cam_data.get("sensor_width_mm", self._default_camera.sensor_width_mm), resolution=cam_data.get("resolution", self._default_camera.resolution) ) return SystemConfig( camera=camera, operational_area=OperationalArea(**raw_data.get("operational_area", {})), models=ModelPaths(**raw_data.get("models", {})), database=DatabaseConfig(**raw_data.get("database", {})), api=APIConfig(**raw_data.get("api", {})) ) def _validate_camera_params(self, cam: CameraParameters, errors: List[str]): if cam.focal_length_mm <= 0: errors.append("Focal length must be positive.") if cam.sensor_width_mm <= 0: errors.append("Sensor width must be positive.") if cam.resolution.get("width", 0) <= 0 or cam.resolution.get("height", 0) <= 0: errors.append("Resolution dimensions must be positive.") def _validate_operational_area(self, area: OperationalArea, errors: List[str]): if not (-90.0 <= area.min_lat <= area.max_lat <= 90.0): errors.append("Invalid latitude bounds in operational area.") if not (-180.0 <= area.min_lon <= area.max_lon <= 180.0): errors.append("Invalid longitude bounds in operational area.") def _validate_paths(self, models: ModelPaths, errors: List[str]): # In a strict environment, we might check os.path.exists() here # For mock/dev, we just ensure they are non-empty strings if not models.superpoint or not models.dinov2: errors.append("Critical model paths are missing.") def validate_config(self, config: SystemConfig) -> ValidationResult: errors = [] self._validate_camera_params(config.camera, errors) self._validate_operational_area(config.operational_area, errors) self._validate_paths(config.models, errors) return ValidationResult(is_valid=len(errors) == 0, errors=errors) def load_config(self, config_path: str = "config.yaml") -> SystemConfig: raw_data = self._parse_yaml_file(config_path) # Environment variable overrides if "GOOGLE_MAPS_API_KEY" in os.environ: # Example of how env vars could inject sensitive fields into raw_data before validation pass config = self._apply_defaults(raw_data) val_res = self.validate_config(config) if not val_res.is_valid: raise ValueError(f"Configuration validation failed: {val_res.errors}") self._system_config = config logger.info("System configuration loaded successfully.") return config def _get_cached_config(self) -> SystemConfig: if not self._system_config: return self.load_config() return self._system_config def get_camera_params(self, camera_id: Optional[str] = None) -> CameraParameters: if camera_id is None: return self._get_cached_config().camera # Extensibility: support multiple cameras in the future return self._get_cached_config().camera def update_config(self, section: str, key: str, value: Any) -> bool: config = self._get_cached_config() if not hasattr(config, section): return False section_obj = getattr(config, section) if not hasattr(section_obj, key): return False try: # Enforce type checking via pydantic setattr(section_obj, key, value) return True except Exception: return False # --- 17.02 Feature: Flight Configuration --- def _build_flight_config(self, flight_id: str) -> Optional[FlightConfig]: if self.db: flight = self.db.get_flight_by_id(flight_id) if flight: return FlightConfig( camera_params=flight.camera_params, altitude=flight.altitude_m, operational_area=self._get_cached_config().operational_area ) return None def save_flight_config(self, flight_id: str, config: FlightConfig) -> bool: if not flight_id or not config: return False self._flight_configs_cache[flight_id] = config return True def get_flight_config(self, flight_id: str) -> FlightConfig: if flight_id in self._flight_configs_cache: return self._flight_configs_cache[flight_id] config = self._build_flight_config(flight_id) if config: self._flight_configs_cache[flight_id] = config return config raise ValueError(f"Flight configuration for {flight_id} not found.") def get_operational_altitude(self, flight_id: str) -> float: config = self.get_flight_config(flight_id) if not (10.0 <= config.altitude <= 2000.0): logger.warning(f"Altitude {config.altitude} outside expected bounds.") return config.altitude def get_frame_spacing(self, flight_id: str) -> float: # Calculates expected displacement between frames. Defaulting to 100m for wing-type UAVs. try: config = self.get_flight_config(flight_id) # Could incorporate altitude/velocity heuristics here return 100.0 except ValueError: return 100.0