mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-23 00:56:37 +00:00
initial structure implemented
docs -> _docs
This commit is contained in:
@@ -0,0 +1,20 @@
|
||||
from .camera_model import CameraModel
|
||||
from .gsd_calculator import GSDCalculator
|
||||
from .robust_kernels import RobustKernels
|
||||
from .faiss_index_manager import FaissIndexManager
|
||||
from .performance_monitor import PerformanceMonitor
|
||||
from .web_mercator_utils import WebMercatorUtils
|
||||
from .image_rotation_utils import ImageRotationUtils
|
||||
from .batch_validator import BatchValidator
|
||||
|
||||
__all__ = [
|
||||
"CameraModel",
|
||||
"GSDCalculator",
|
||||
"RobustKernels",
|
||||
"FaissIndexManager",
|
||||
"PerformanceMonitor",
|
||||
"WebMercatorUtils",
|
||||
"ImageRotationUtils",
|
||||
"BatchValidator",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,38 @@
|
||||
from typing import Optional
|
||||
import numpy as np
|
||||
|
||||
from models.core import ValidationResult
|
||||
from models.images import ImageBatch
|
||||
|
||||
|
||||
class BatchValidator:
|
||||
@staticmethod
|
||||
def validate_batch(batch: ImageBatch) -> ValidationResult:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def validate_image_format(image_bytes: bytes) -> ValidationResult:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def validate_sequence_continuity(
|
||||
current_batch: ImageBatch,
|
||||
expected_start: int,
|
||||
) -> ValidationResult:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def validate_image_dimensions(
|
||||
image: np.ndarray,
|
||||
expected_width: int,
|
||||
expected_height: int,
|
||||
) -> ValidationResult:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def validate_batch_size(
|
||||
batch: ImageBatch,
|
||||
max_size: int = 100,
|
||||
) -> ValidationResult:
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
import numpy as np
|
||||
|
||||
from models.core import CameraParameters
|
||||
|
||||
|
||||
class CameraModel:
|
||||
def __init__(self, params: CameraParameters):
|
||||
self._params = params
|
||||
self._K: np.ndarray | None = None
|
||||
|
||||
@property
|
||||
def intrinsic_matrix(self) -> np.ndarray:
|
||||
if self._K is None:
|
||||
fx = self._params.focal_length * self._params.resolution_width / self._params.sensor_width
|
||||
fy = self._params.focal_length * self._params.resolution_height / self._params.sensor_height
|
||||
cx, cy = self._params.get_principal_point()
|
||||
self._K = np.array([
|
||||
[fx, 0, cx],
|
||||
[0, fy, cy],
|
||||
[0, 0, 1]
|
||||
], dtype=np.float64)
|
||||
return self._K
|
||||
|
||||
def project(self, points_3d: np.ndarray) -> np.ndarray:
|
||||
raise NotImplementedError
|
||||
|
||||
def unproject(self, points_2d: np.ndarray, depth: float) -> np.ndarray:
|
||||
raise NotImplementedError
|
||||
|
||||
def undistort_points(self, points: np.ndarray) -> np.ndarray:
|
||||
raise NotImplementedError
|
||||
|
||||
def get_fov(self) -> tuple[float, float]:
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
from typing import Optional
|
||||
import numpy as np
|
||||
|
||||
|
||||
class FaissIndexManager:
|
||||
def __init__(self, dimension: int, index_type: str = "IVF"):
|
||||
self._dimension = dimension
|
||||
self._index_type = index_type
|
||||
self._index = None
|
||||
self._id_map: dict[int, str] = {}
|
||||
|
||||
def build_index(self, vectors: np.ndarray, ids: list[str]) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
def add_vectors(self, vectors: np.ndarray, ids: list[str]) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
def search(
|
||||
self, query: np.ndarray, top_k: int = 10
|
||||
) -> list[tuple[str, float]]:
|
||||
raise NotImplementedError
|
||||
|
||||
def remove_vectors(self, ids: list[str]) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
def save_index(self, path: str) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
def load_index(self, path: str) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
def get_vector_count(self) -> int:
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
from models.core import CameraParameters
|
||||
|
||||
|
||||
class GSDCalculator:
|
||||
@staticmethod
|
||||
def calculate_gsd(
|
||||
altitude: float,
|
||||
focal_length: float,
|
||||
sensor_width: float,
|
||||
image_width: int,
|
||||
) -> float:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def calculate_footprint(
|
||||
altitude: float,
|
||||
camera_params: CameraParameters,
|
||||
) -> tuple[float, float]:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def altitude_for_gsd(
|
||||
target_gsd: float,
|
||||
focal_length: float,
|
||||
sensor_width: float,
|
||||
image_width: int,
|
||||
) -> float:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def calculate_coverage_radius(
|
||||
altitude: float,
|
||||
camera_params: CameraParameters,
|
||||
) -> float:
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
class ImageRotationUtils:
|
||||
@staticmethod
|
||||
def rotate_image(
|
||||
image: np.ndarray,
|
||||
angle: float,
|
||||
center: tuple[float, float] | None = None,
|
||||
) -> np.ndarray:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def get_rotation_matrix(
|
||||
angle: float,
|
||||
center: tuple[float, float],
|
||||
) -> np.ndarray:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def rotate_points(
|
||||
points: np.ndarray,
|
||||
angle: float,
|
||||
center: tuple[float, float],
|
||||
) -> np.ndarray:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def normalize_angle(angle: float) -> float:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def angle_difference(angle1: float, angle2: float) -> float:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def interpolate_angles(
|
||||
angles: list[float],
|
||||
weights: list[float] | None = None,
|
||||
) -> float:
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
import time
|
||||
from typing import Optional
|
||||
from contextlib import contextmanager
|
||||
|
||||
|
||||
class PerformanceMonitor:
|
||||
def __init__(self):
|
||||
self._timings: dict[str, list[float]] = {}
|
||||
self._counters: dict[str, int] = {}
|
||||
|
||||
@contextmanager
|
||||
def measure(self, operation: str):
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
elapsed = time.perf_counter() - start
|
||||
if operation not in self._timings:
|
||||
self._timings[operation] = []
|
||||
self._timings[operation].append(elapsed)
|
||||
|
||||
def increment(self, counter: str, value: int = 1) -> None:
|
||||
if counter not in self._counters:
|
||||
self._counters[counter] = 0
|
||||
self._counters[counter] += value
|
||||
|
||||
def get_average(self, operation: str) -> Optional[float]:
|
||||
if operation not in self._timings or not self._timings[operation]:
|
||||
return None
|
||||
return sum(self._timings[operation]) / len(self._timings[operation])
|
||||
|
||||
def get_statistics(self, operation: str) -> Optional[dict]:
|
||||
raise NotImplementedError
|
||||
|
||||
def get_counter(self, counter: str) -> int:
|
||||
return self._counters.get(counter, 0)
|
||||
|
||||
def reset(self) -> None:
|
||||
self._timings.clear()
|
||||
self._counters.clear()
|
||||
|
||||
def get_report(self) -> dict:
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
class RobustKernels:
|
||||
@staticmethod
|
||||
def huber(residual: float, delta: float = 1.0) -> float:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def cauchy(residual: float, c: float = 1.0) -> float:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def tukey(residual: float, c: float = 4.685) -> float:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def huber_weight(residual: float, delta: float = 1.0) -> float:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def cauchy_weight(residual: float, c: float = 1.0) -> float:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def compute_robust_covariance(
|
||||
residuals: np.ndarray,
|
||||
kernel: str = "huber",
|
||||
**kwargs,
|
||||
) -> np.ndarray:
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
import math
|
||||
|
||||
from models.core import GPSPoint
|
||||
from models.satellite import TileCoords, TileBounds
|
||||
|
||||
|
||||
class WebMercatorUtils:
|
||||
EARTH_RADIUS = 6378137.0
|
||||
TILE_SIZE = 256
|
||||
|
||||
@classmethod
|
||||
def gps_to_tile(cls, gps: GPSPoint, zoom: int) -> TileCoords:
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def tile_to_gps(cls, coords: TileCoords) -> GPSPoint:
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def get_tile_bounds(cls, coords: TileCoords) -> TileBounds:
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def gps_to_pixel(
|
||||
cls, gps: GPSPoint, zoom: int
|
||||
) -> tuple[float, float]:
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def pixel_to_gps(
|
||||
cls, pixel: tuple[float, float], zoom: int
|
||||
) -> GPSPoint:
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def meters_per_pixel(cls, lat: float, zoom: int) -> float:
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def get_tiles_in_bounds(
|
||||
cls, nw: GPSPoint, se: GPSPoint, zoom: int
|
||||
) -> list[TileCoords]:
|
||||
raise NotImplementedError
|
||||
|
||||
Reference in New Issue
Block a user