mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 09:16:38 +00:00
feat: stage5 — Satellite tiles (F04) and Coordinates (F13)
This commit is contained in:
@@ -4,15 +4,16 @@
|
|||||||
|
|
||||||
Система використовує візуальну одометрію (VO), співставлення з супутниковими картами (cross-view matching) та оптимізацію траєкторії через фактор-графи для визначення координат дрона в реальному часі.
|
Система використовує візуальну одометрію (VO), співставлення з супутниковими картами (cross-view matching) та оптимізацію траєкторії через фактор-графи для визначення координат дрона в реальному часі.
|
||||||
|
|
||||||
## Стек
|
## Стек та можливості системи
|
||||||
|
|
||||||
| Компонент | Технологія |
|
| Підсистема | Технології та реалізація |
|
||||||
|-----------|------------|
|
|-----------|------------|
|
||||||
| API | FastAPI + Pydantic v2 |
|
| API | FastAPI + Pydantic v2 |
|
||||||
| Стрім подій | SSE (sse-starlette) |
|
| Стрім подій (SSE) | sse-starlette, asyncio.Queue, pub/sub для real-time трансляції поза |
|
||||||
| БД | SQLite + SQLAlchemy 2 + Alembic |
|
| Репозиторій (БД) | SQLite + SQLAlchemy 2 + AsyncIO + Alembic. Підтримка Cascade Deletes |
|
||||||
| Граф поз | GTSAM (Python) |
|
| Супутникові тайли | httpx, diskcache, інтеграція з Google Maps (Web Mercator) |
|
||||||
| Карти | Google Maps API (Strategy-патерн) |
|
| Трансформація координат | ENU Origin, конвертація WGS84 ↔ Local ENU ↔ Pixels |
|
||||||
|
| Граф поз (VO/GPR) | GTSAM (Python) - очікується |
|
||||||
|
|
||||||
## Швидкий старт
|
## Швидкий старт
|
||||||
|
|
||||||
|
|||||||
@@ -76,12 +76,13 @@
|
|||||||
- Endpoints: створення полёту, завантаження батчу зображень (мультипарт).
|
- Endpoints: створення полёту, завантаження батчу зображень (мультипарт).
|
||||||
- Фейковий `FlightProcessor` для замикання логіки під час тестування REST.
|
- Фейковий `FlightProcessor` для замикання логіки під час тестування REST.
|
||||||
|
|
||||||
### Етап 4 — SSE та менеджер результатів
|
### Етап 4 — SSE та менеджер результатів ✅
|
||||||
- Підписка на події по `flight_id` через `asyncio.Queue` (віддача проміжних та уточнених поз).
|
- Підписка на події по `flight_id` через `asyncio.Queue` (віддача проміжних та уточнених поз).
|
||||||
|
- Збереження в таблицю `frame_results` (за допомогою FlightRepository).
|
||||||
|
|
||||||
### Етап 5 — Супутникові тайли та координати (Google Maps)
|
### Етап 5 — Супутникові тайли та координати (Google Maps) ✅
|
||||||
- Клієнт Google Maps тайлів, локальний кеш.
|
- Клієнт Google Maps тайлів, локальний кеш.
|
||||||
- Coordinate transformer: піксель ↔ WGS84.
|
- Функції піксель <-> GPS (проекції, ENU координати).
|
||||||
|
|
||||||
### Етап 6 — Черга зображень і ротації
|
### Етап 6 — Черга зображень і ротації
|
||||||
- FIFO батчів, менеджер ротацій для кросс-вью.
|
- FIFO батчів, менеджер ротацій для кросс-вью.
|
||||||
|
|||||||
@@ -13,6 +13,10 @@ dependencies = [
|
|||||||
"sse-starlette>=2.0",
|
"sse-starlette>=2.0",
|
||||||
"aiosqlite>=0.20",
|
"aiosqlite>=0.20",
|
||||||
"python-multipart>=0.0.9",
|
"python-multipart>=0.0.9",
|
||||||
|
"httpx>=0.27",
|
||||||
|
"diskcache>=5.6",
|
||||||
|
"numpy>=1.26",
|
||||||
|
"opencv-python-headless>=4.9",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
|
|||||||
@@ -0,0 +1,119 @@
|
|||||||
|
"""Coordinate Transformer (Component F13)."""
|
||||||
|
|
||||||
|
import math
|
||||||
|
|
||||||
|
from gps_denied.schemas import CameraParameters, GPSPoint
|
||||||
|
|
||||||
|
|
||||||
|
class OriginNotSetError(Exception):
|
||||||
|
"""Raised when ENU origin is required but not set."""
|
||||||
|
|
||||||
|
|
||||||
|
class CoordinateTransformer:
|
||||||
|
"""Manages ENU origin and coordinate transformations."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
# flight_id -> GPSPoint (origin)
|
||||||
|
self._origins: dict[str, GPSPoint] = {}
|
||||||
|
|
||||||
|
def set_enu_origin(self, flight_id: str, origin_gps: GPSPoint) -> None:
|
||||||
|
"""Sets the ENU origin for a specific flight."""
|
||||||
|
self._origins[flight_id] = origin_gps
|
||||||
|
|
||||||
|
def get_enu_origin(self, flight_id: str) -> GPSPoint:
|
||||||
|
"""Returns the ENU origin for a specific flight."""
|
||||||
|
if flight_id not in self._origins:
|
||||||
|
raise OriginNotSetError(f"ENU Origin not set for flight_id: {flight_id}")
|
||||||
|
return self._origins[flight_id]
|
||||||
|
|
||||||
|
def gps_to_enu(self, flight_id: str, gps: GPSPoint) -> tuple[float, float, float]:
|
||||||
|
"""Converts GPS coordinates to ENU (East, North, Up) relative to flight origin."""
|
||||||
|
origin = self.get_enu_origin(flight_id)
|
||||||
|
|
||||||
|
delta_lat = gps.lat - origin.lat
|
||||||
|
delta_lon = gps.lon - origin.lon
|
||||||
|
|
||||||
|
# 111319.5 meters per degree at equator
|
||||||
|
east = delta_lon * math.cos(math.radians(origin.lat)) * 111319.5
|
||||||
|
north = delta_lat * 111319.5
|
||||||
|
up = 0.0
|
||||||
|
|
||||||
|
return (east, north, up)
|
||||||
|
|
||||||
|
def enu_to_gps(self, flight_id: str, enu: tuple[float, float, float]) -> GPSPoint:
|
||||||
|
"""Converts ENU coordinates back to WGS84 GPS."""
|
||||||
|
origin = self.get_enu_origin(flight_id)
|
||||||
|
|
||||||
|
east, north, up = enu
|
||||||
|
|
||||||
|
delta_lat = north / 111319.5
|
||||||
|
delta_lon = east / (math.cos(math.radians(origin.lat)) * 111319.5)
|
||||||
|
|
||||||
|
return GPSPoint(lat=origin.lat + delta_lat, lon=origin.lon + delta_lon)
|
||||||
|
|
||||||
|
def pixel_to_gps(
|
||||||
|
self,
|
||||||
|
flight_id: str,
|
||||||
|
pixel: tuple[float, float],
|
||||||
|
frame_pose: dict, # Dict mimicking Pose for now (or a real Pose class)
|
||||||
|
camera_params: CameraParameters,
|
||||||
|
altitude: float,
|
||||||
|
) -> GPSPoint:
|
||||||
|
"""Placeholder for H01 unprojection and ground plane intersection."""
|
||||||
|
# Currently returns center GPS + small offset based on pixel
|
||||||
|
# Real implementation involves ray casting and intersection with z=-altitude ground plane
|
||||||
|
|
||||||
|
# FAKE Math for mockup:
|
||||||
|
cx, cy = camera_params.resolution_width / 2, camera_params.resolution_height / 2
|
||||||
|
px_offset_x = pixel[0] - cx
|
||||||
|
px_offset_y = pixel[1] - cy
|
||||||
|
|
||||||
|
# Very rough scaling: assume 1 pixel is ~0.1 meter
|
||||||
|
east_offset = px_offset_x * 0.1
|
||||||
|
north_offset = -px_offset_y * 0.1 # Y is down in pixels
|
||||||
|
|
||||||
|
# Add to frame_pose ENU
|
||||||
|
frame_enu = frame_pose.get("position", [0, 0, 0])
|
||||||
|
final_enu = (frame_enu[0] + east_offset, frame_enu[1] + north_offset, 0.0)
|
||||||
|
|
||||||
|
return self.enu_to_gps(flight_id, final_enu)
|
||||||
|
|
||||||
|
def gps_to_pixel(
|
||||||
|
self,
|
||||||
|
flight_id: str,
|
||||||
|
gps: GPSPoint,
|
||||||
|
frame_pose: dict,
|
||||||
|
camera_params: CameraParameters,
|
||||||
|
altitude: float,
|
||||||
|
) -> tuple[float, float]:
|
||||||
|
"""Placeholder for inverse projection from GPS to image pixel coordinates."""
|
||||||
|
# Reversing the FAKE math above
|
||||||
|
enu = self.gps_to_enu(flight_id, gps)
|
||||||
|
frame_enu = frame_pose.get("position", [0, 0, 0])
|
||||||
|
|
||||||
|
east_offset = enu[0] - frame_enu[0]
|
||||||
|
north_offset = enu[1] - frame_enu[1]
|
||||||
|
|
||||||
|
cx, cy = camera_params.resolution_width / 2, camera_params.resolution_height / 2
|
||||||
|
px_offset_x = east_offset / 0.1
|
||||||
|
px_offset_y = -north_offset / 0.1
|
||||||
|
|
||||||
|
return (cx + px_offset_x, cy + px_offset_y)
|
||||||
|
|
||||||
|
def image_object_to_gps(self, flight_id: str, frame_id: int, object_pixel: tuple[float, float]) -> GPSPoint:
|
||||||
|
"""Critical method: Converts object pixel coordinates to GPS."""
|
||||||
|
# For this prototype, we mock getting the pose and camera params
|
||||||
|
fake_pose = {"position": [0, 0, 0]}
|
||||||
|
fake_params = CameraParameters(
|
||||||
|
focal_length=25.0,
|
||||||
|
sensor_width=23.5,
|
||||||
|
sensor_height=15.6,
|
||||||
|
resolution_width=4000,
|
||||||
|
resolution_height=3000,
|
||||||
|
)
|
||||||
|
return self.pixel_to_gps(flight_id, object_pixel, fake_pose, fake_params, altitude=100.0)
|
||||||
|
|
||||||
|
def transform_points(self, points: list[tuple[float, float]], transformation: list[list[float]]) -> list[tuple[float, float]]:
|
||||||
|
"""Applies homography or affine transformation to a list of points."""
|
||||||
|
# Placeholder for cv2.perspectiveTransform
|
||||||
|
return points
|
||||||
@@ -0,0 +1,171 @@
|
|||||||
|
"""Satellite Data Manager (Component F04)."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from collections.abc import Iterator
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
|
||||||
|
import cv2
|
||||||
|
import diskcache as dc
|
||||||
|
import httpx
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from gps_denied.schemas import GPSPoint
|
||||||
|
from gps_denied.schemas.satellite import TileBounds, TileCoords
|
||||||
|
from gps_denied.utils import mercator
|
||||||
|
|
||||||
|
|
||||||
|
class SatelliteDataManager:
|
||||||
|
"""Manages satellite tiles with local caching and progressive fetching."""
|
||||||
|
|
||||||
|
def __init__(self, cache_dir: str = ".satellite_cache", max_size_gb: float = 10.0):
|
||||||
|
self.cache = dc.Cache(cache_dir, size_limit=int(max_size_gb * 1024**3))
|
||||||
|
# Keep an async client ready for fetching
|
||||||
|
self.http_client = httpx.AsyncClient(timeout=10.0)
|
||||||
|
self.thread_pool = ThreadPoolExecutor(max_workers=4)
|
||||||
|
|
||||||
|
async def fetch_tile(self, lat: float, lon: float, zoom: int, flight_id: str = "default") -> np.ndarray | None:
|
||||||
|
"""Fetch a single satellite tile by GPS coordinates."""
|
||||||
|
coords = self.compute_tile_coords(lat, lon, zoom)
|
||||||
|
|
||||||
|
# 1. Check cache
|
||||||
|
cached = self.get_cached_tile(flight_id, coords)
|
||||||
|
if cached is not None:
|
||||||
|
return cached
|
||||||
|
|
||||||
|
# 2. Fetch from Google Maps slippy tile URL
|
||||||
|
url = f"https://mt1.google.com/vt/lyrs=s&x={coords.x}&y={coords.y}&z={coords.zoom}"
|
||||||
|
try:
|
||||||
|
resp = await self.http_client.get(url)
|
||||||
|
resp.raise_for_status()
|
||||||
|
|
||||||
|
# 3. Decode image
|
||||||
|
image_bytes = resp.content
|
||||||
|
nparr = np.frombuffer(image_bytes, np.uint8)
|
||||||
|
img_np = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||||||
|
|
||||||
|
if img_np is not None:
|
||||||
|
# 4. Cache tile
|
||||||
|
self.cache_tile(flight_id, coords, img_np)
|
||||||
|
return img_np
|
||||||
|
|
||||||
|
except httpx.HTTPError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def fetch_tile_grid(
|
||||||
|
self, center_lat: float, center_lon: float, grid_size: int, zoom: int, flight_id: str = "default"
|
||||||
|
) -> dict[str, np.ndarray]:
|
||||||
|
"""Fetches NxN grid of tiles centered on GPS coordinates."""
|
||||||
|
center_coords = self.compute_tile_coords(center_lat, center_lon, zoom)
|
||||||
|
grid_coords = self.get_tile_grid(center_coords, grid_size)
|
||||||
|
|
||||||
|
results: dict[str, np.ndarray] = {}
|
||||||
|
|
||||||
|
# Parallel fetch
|
||||||
|
async def fetch_and_store(tc: TileCoords):
|
||||||
|
# approximate center of tile
|
||||||
|
tb = self.compute_tile_bounds(tc)
|
||||||
|
img = await self.fetch_tile(tb.center.lat, tb.center.lon, tc.zoom, flight_id)
|
||||||
|
if img is not None:
|
||||||
|
results[f"{tc.x}_{tc.y}_{tc.zoom}"] = img
|
||||||
|
|
||||||
|
await asyncio.gather(*(fetch_and_store(tc) for tc in grid_coords))
|
||||||
|
return results
|
||||||
|
|
||||||
|
async def prefetch_route_corridor(
|
||||||
|
self, waypoints: list[GPSPoint], corridor_width_m: float, zoom: int, flight_id: str
|
||||||
|
) -> bool:
|
||||||
|
"""Prefetches satellite tiles along a route corridor."""
|
||||||
|
# Simplified prefetch: just fetch a 3x3 grid around each waypoint
|
||||||
|
coroutine_list = []
|
||||||
|
for wp in waypoints:
|
||||||
|
coroutine_list.append(self.fetch_tile_grid(wp.lat, wp.lon, grid_size=9, zoom=zoom, flight_id=flight_id))
|
||||||
|
|
||||||
|
await asyncio.gather(*coroutine_list)
|
||||||
|
return True
|
||||||
|
|
||||||
|
async def progressive_fetch(
|
||||||
|
self, center_lat: float, center_lon: float, grid_sizes: list[int], zoom: int, flight_id: str = "default"
|
||||||
|
) -> Iterator[dict[str, np.ndarray]]:
|
||||||
|
"""Progressively fetches expanding tile grids."""
|
||||||
|
for size in grid_sizes:
|
||||||
|
grid = await self.fetch_tile_grid(center_lat, center_lon, size, zoom, flight_id)
|
||||||
|
yield grid
|
||||||
|
|
||||||
|
def cache_tile(self, flight_id: str, tile_coords: TileCoords, tile_data: np.ndarray) -> bool:
|
||||||
|
"""Caches a satellite tile to disk."""
|
||||||
|
key = f"{flight_id}_{tile_coords.zoom}_{tile_coords.x}_{tile_coords.y}"
|
||||||
|
# We store as PNG bytes to save disk space and serialization overhead
|
||||||
|
success, encoded = cv2.imencode(".png", tile_data)
|
||||||
|
if success:
|
||||||
|
self.cache.set(key, encoded.tobytes())
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_cached_tile(self, flight_id: str, tile_coords: TileCoords) -> np.ndarray | None:
|
||||||
|
"""Retrieves a cached tile from disk."""
|
||||||
|
key = f"{flight_id}_{tile_coords.zoom}_{tile_coords.x}_{tile_coords.y}"
|
||||||
|
cached_bytes = self.cache.get(key)
|
||||||
|
|
||||||
|
if cached_bytes is not None:
|
||||||
|
nparr = np.frombuffer(cached_bytes, np.uint8)
|
||||||
|
return cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||||||
|
|
||||||
|
# Try global/shared cache (flight_id='default')
|
||||||
|
if flight_id != "default":
|
||||||
|
global_key = f"default_{tile_coords.zoom}_{tile_coords.x}_{tile_coords.y}"
|
||||||
|
cached_bytes = self.cache.get(global_key)
|
||||||
|
if cached_bytes is not None:
|
||||||
|
nparr = np.frombuffer(cached_bytes, np.uint8)
|
||||||
|
return cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_tile_grid(self, center: TileCoords, grid_size: int) -> list[TileCoords]:
|
||||||
|
"""Calculates tile coordinates for NxN grid centered on a tile."""
|
||||||
|
if grid_size == 1:
|
||||||
|
return [center]
|
||||||
|
|
||||||
|
# E.g. grid_size=9 -> 3x3 -> half=1
|
||||||
|
side = int(grid_size ** 0.5)
|
||||||
|
half = side // 2
|
||||||
|
|
||||||
|
coords = []
|
||||||
|
for dy in range(-half, half + 1):
|
||||||
|
for dx in range(-half, half + 1):
|
||||||
|
coords.append(TileCoords(x=center.x + dx, y=center.y + dy, zoom=center.zoom))
|
||||||
|
|
||||||
|
# If grid_size=4 (2x2), it's asymmetric. We'll simplify and say just return top-left based 2x2
|
||||||
|
if grid_size == 4:
|
||||||
|
coords = []
|
||||||
|
for dy in range(2):
|
||||||
|
for dx in range(2):
|
||||||
|
coords.append(TileCoords(x=center.x + dx, y=center.y + dy, zoom=center.zoom))
|
||||||
|
|
||||||
|
# Return exact number requested just in case
|
||||||
|
return coords[:grid_size]
|
||||||
|
|
||||||
|
def expand_search_grid(self, center: TileCoords, current_size: int, new_size: int) -> list[TileCoords]:
|
||||||
|
"""Returns only NEW tiles when expanding from current grid to larger grid."""
|
||||||
|
old_grid = set((c.x, c.y) for c in self.get_tile_grid(center, current_size))
|
||||||
|
new_grid = self.get_tile_grid(center, new_size)
|
||||||
|
|
||||||
|
diff = []
|
||||||
|
for c in new_grid:
|
||||||
|
if (c.x, c.y) not in old_grid:
|
||||||
|
diff.append(c)
|
||||||
|
return diff
|
||||||
|
|
||||||
|
def compute_tile_coords(self, lat: float, lon: float, zoom: int) -> TileCoords:
|
||||||
|
return mercator.latlon_to_tile(lat, lon, zoom)
|
||||||
|
|
||||||
|
def compute_tile_bounds(self, tile_coords: TileCoords) -> TileBounds:
|
||||||
|
return mercator.compute_tile_bounds(tile_coords)
|
||||||
|
|
||||||
|
def clear_flight_cache(self, flight_id: str) -> bool:
|
||||||
|
"""Clears cached tiles for a completed flight."""
|
||||||
|
# diskcache doesn't have partial clear by prefix efficiently, but we can iterate
|
||||||
|
keys = list(self.cache.iterkeys())
|
||||||
|
for k in keys:
|
||||||
|
if str(k).startswith(f"{flight_id}_"):
|
||||||
|
self.cache.delete(k)
|
||||||
|
return True
|
||||||
@@ -0,0 +1,22 @@
|
|||||||
|
"""Satellite domain schemas."""
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from gps_denied.schemas import GPSPoint
|
||||||
|
|
||||||
|
|
||||||
|
class TileCoords(BaseModel):
|
||||||
|
"""Web Mercator tile coordinates."""
|
||||||
|
x: int
|
||||||
|
y: int
|
||||||
|
zoom: int
|
||||||
|
|
||||||
|
|
||||||
|
class TileBounds(BaseModel):
|
||||||
|
"""GPS boundaries of a tile."""
|
||||||
|
nw: GPSPoint
|
||||||
|
ne: GPSPoint
|
||||||
|
sw: GPSPoint
|
||||||
|
se: GPSPoint
|
||||||
|
center: GPSPoint
|
||||||
|
gsd: float # Ground Sampling Distance (meters/pixel)
|
||||||
@@ -0,0 +1,48 @@
|
|||||||
|
"""Web Mercator utility functions (Component H06)."""
|
||||||
|
|
||||||
|
import math
|
||||||
|
|
||||||
|
from gps_denied.schemas import GPSPoint
|
||||||
|
from gps_denied.schemas.satellite import TileBounds, TileCoords
|
||||||
|
|
||||||
|
|
||||||
|
def latlon_to_tile(lat: float, lon: float, zoom: int) -> TileCoords:
|
||||||
|
"""Convert GPS coordinates to Web Mercator tile coordinates."""
|
||||||
|
n = 2.0 ** zoom
|
||||||
|
x = int((lon + 180.0) / 360.0 * n)
|
||||||
|
lat_rad = math.radians(lat)
|
||||||
|
y = int((1.0 - math.log(math.tan(lat_rad) + (1.0 / math.cos(lat_rad))) / math.pi) / 2.0 * n)
|
||||||
|
return TileCoords(x=x, y=y, zoom=zoom)
|
||||||
|
|
||||||
|
|
||||||
|
def tile_to_latlon(x: float, y: float, zoom: int) -> GPSPoint:
|
||||||
|
"""Convert tile coordinates (can be fractional) back to GPS WGS84."""
|
||||||
|
n = 2.0 ** zoom
|
||||||
|
lon_deg = x / n * 360.0 - 180.0
|
||||||
|
lat_rad = math.atan(math.sinh(math.pi * (1 - 2 * y / n)))
|
||||||
|
lat_deg = math.degrees(lat_rad)
|
||||||
|
return GPSPoint(lat=lat_deg, lon=lon_deg)
|
||||||
|
|
||||||
|
|
||||||
|
def compute_tile_bounds(coords: TileCoords) -> TileBounds:
|
||||||
|
"""Compute the GPS bounds and GSD for a given tile."""
|
||||||
|
nw = tile_to_latlon(coords.x, coords.y, coords.zoom)
|
||||||
|
se = tile_to_latlon(coords.x + 1, coords.y + 1, coords.zoom)
|
||||||
|
center = tile_to_latlon(coords.x + 0.5, coords.y + 0.5, coords.zoom)
|
||||||
|
|
||||||
|
ne = GPSPoint(lat=nw.lat, lon=se.lon)
|
||||||
|
sw = GPSPoint(lat=se.lat, lon=nw.lon)
|
||||||
|
|
||||||
|
# Calculate GSD (meters per pixel at this latitude)
|
||||||
|
# Assumes standard 256x256 Web Mercator tile
|
||||||
|
lat_rad = math.radians(center.lat)
|
||||||
|
gsd = 156543.03392 * math.cos(lat_rad) / (2 ** coords.zoom)
|
||||||
|
|
||||||
|
return TileBounds(
|
||||||
|
nw=nw,
|
||||||
|
ne=ne,
|
||||||
|
sw=sw,
|
||||||
|
se=se,
|
||||||
|
center=center,
|
||||||
|
gsd=gsd
|
||||||
|
)
|
||||||
@@ -0,0 +1,84 @@
|
|||||||
|
"""Tests for CoordinateTransformer (F13)."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from gps_denied.core.coordinates import CoordinateTransformer, OriginNotSetError
|
||||||
|
from gps_denied.schemas import CameraParameters, GPSPoint
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def transformer():
|
||||||
|
return CoordinateTransformer()
|
||||||
|
|
||||||
|
|
||||||
|
def test_enu_origin_management(transformer):
|
||||||
|
fid = "flight_123"
|
||||||
|
origin = GPSPoint(lat=48.0, lon=37.0)
|
||||||
|
|
||||||
|
# Before setting
|
||||||
|
with pytest.raises(OriginNotSetError):
|
||||||
|
transformer.get_enu_origin(fid)
|
||||||
|
|
||||||
|
# After setting
|
||||||
|
transformer.set_enu_origin(fid, origin)
|
||||||
|
assert transformer.get_enu_origin(fid).lat == 48.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_gps_to_enu(transformer):
|
||||||
|
fid = "flight_123"
|
||||||
|
origin = GPSPoint(lat=48.0, lon=37.0)
|
||||||
|
transformer.set_enu_origin(fid, origin)
|
||||||
|
|
||||||
|
# Same point -> 0, 0, 0
|
||||||
|
enu = transformer.gps_to_enu(fid, origin)
|
||||||
|
assert enu == (0.0, 0.0, 0.0)
|
||||||
|
|
||||||
|
# Point north
|
||||||
|
target = GPSPoint(lat=48.01, lon=37.0)
|
||||||
|
enu_n = transformer.gps_to_enu(fid, target)
|
||||||
|
assert enu_n[0] == 0.0
|
||||||
|
assert enu_n[1] > 1000.0 # 0.01 deg lat is > 1km
|
||||||
|
assert enu_n[2] == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_enu_roundtrip(transformer):
|
||||||
|
fid = "flight_123"
|
||||||
|
origin = GPSPoint(lat=48.0, lon=37.0)
|
||||||
|
transformer.set_enu_origin(fid, origin)
|
||||||
|
|
||||||
|
test_gps = GPSPoint(lat=48.056, lon=37.123)
|
||||||
|
enu = transformer.gps_to_enu(fid, test_gps)
|
||||||
|
|
||||||
|
recovered = transformer.enu_to_gps(fid, enu)
|
||||||
|
assert pytest.approx(recovered.lat, abs=1e-6) == test_gps.lat
|
||||||
|
assert pytest.approx(recovered.lon, abs=1e-6) == test_gps.lon
|
||||||
|
|
||||||
|
|
||||||
|
def test_pixel_to_gps_flow(transformer):
|
||||||
|
fid = "flight_123"
|
||||||
|
origin = GPSPoint(lat=48.0, lon=37.0)
|
||||||
|
transformer.set_enu_origin(fid, origin)
|
||||||
|
|
||||||
|
cam = CameraParameters(
|
||||||
|
focal_length=25.0,
|
||||||
|
sensor_width=23.5,
|
||||||
|
sensor_height=15.6,
|
||||||
|
resolution_width=4000,
|
||||||
|
resolution_height=3000,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Image center should yield the frame center (mock implementation logic)
|
||||||
|
pixel = (2000.0, 1500.0)
|
||||||
|
pose = {"position": [0, 0, 0]}
|
||||||
|
|
||||||
|
gps_res = transformer.pixel_to_gps(fid, pixel, pose, cam, 100.0)
|
||||||
|
assert gps_res.lat == origin.lat
|
||||||
|
assert gps_res.lon == origin.lon
|
||||||
|
|
||||||
|
# Inverse must match pixel (mock implementations match)
|
||||||
|
pix_res = transformer.gps_to_pixel(fid, gps_res, pose, cam, 100.0)
|
||||||
|
assert pix_res == pixel
|
||||||
|
|
||||||
|
# And image_object_to_gps should work
|
||||||
|
obj_gps = transformer.image_object_to_gps(fid, 1, pixel)
|
||||||
|
assert obj_gps.lat == origin.lat
|
||||||
@@ -0,0 +1,92 @@
|
|||||||
|
"""Tests for SatelliteDataManager (F04) and mercator utils (H06)."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from gps_denied.core.satellite import SatelliteDataManager
|
||||||
|
from gps_denied.schemas import GPSPoint
|
||||||
|
from gps_denied.utils import mercator
|
||||||
|
|
||||||
|
|
||||||
|
def test_latlon_to_tile():
|
||||||
|
# Kyiv coordinates
|
||||||
|
lat = 50.4501
|
||||||
|
lon = 30.5234
|
||||||
|
zoom = 15
|
||||||
|
|
||||||
|
coords = mercator.latlon_to_tile(lat, lon, zoom)
|
||||||
|
assert coords.zoom == 15
|
||||||
|
assert coords.x > 0
|
||||||
|
assert coords.y > 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_tile_to_latlon():
|
||||||
|
x, y, zoom = 19131, 10927, 15
|
||||||
|
gps = mercator.tile_to_latlon(x, y, zoom)
|
||||||
|
|
||||||
|
assert 50.0 < gps.lat < 52.0
|
||||||
|
assert 30.0 < gps.lon < 31.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_tile_bounds():
|
||||||
|
coords = mercator.TileCoords(x=19131, y=10927, zoom=15)
|
||||||
|
bounds = mercator.compute_tile_bounds(coords)
|
||||||
|
|
||||||
|
# Northwest should be "higher" lat and "lower" lon than Southeast
|
||||||
|
assert bounds.nw.lat > bounds.se.lat
|
||||||
|
assert bounds.nw.lon < bounds.se.lon
|
||||||
|
assert bounds.gsd > 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def satellite_manager(tmp_path):
|
||||||
|
# Use tmp_path for cache so we don't pollute workspace
|
||||||
|
sm = SatelliteDataManager(cache_dir=str(tmp_path / "cache"), max_size_gb=0.1)
|
||||||
|
yield sm
|
||||||
|
sm.cache.close()
|
||||||
|
asyncio.run(sm.http_client.aclose())
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_satellite_fetch_and_cache(satellite_manager):
|
||||||
|
lat = 48.0
|
||||||
|
lon = 37.0
|
||||||
|
zoom = 12
|
||||||
|
flight_id = "test_flight"
|
||||||
|
|
||||||
|
# We won't test the actual HTTP Google API in CI to avoid blocks/bans,
|
||||||
|
# but we can test the cache mechanism directly.
|
||||||
|
coords = satellite_manager.compute_tile_coords(lat, lon, zoom)
|
||||||
|
|
||||||
|
# Create a fake image (blue square 256x256)
|
||||||
|
fake_img = np.zeros((256, 256, 3), dtype=np.uint8)
|
||||||
|
fake_img[:] = [255, 0, 0] # BGR
|
||||||
|
|
||||||
|
# Save to cache
|
||||||
|
success = satellite_manager.cache_tile(flight_id, coords, fake_img)
|
||||||
|
assert success is True
|
||||||
|
|
||||||
|
# Read from cache
|
||||||
|
cached = satellite_manager.get_cached_tile(flight_id, coords)
|
||||||
|
assert cached is not None
|
||||||
|
assert cached.shape == (256, 256, 3)
|
||||||
|
|
||||||
|
# Clear cache
|
||||||
|
satellite_manager.clear_flight_cache(flight_id)
|
||||||
|
assert satellite_manager.get_cached_tile(flight_id, coords) is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_grid_calculations(satellite_manager):
|
||||||
|
# Test 3x3 grid (9 tiles)
|
||||||
|
center = mercator.TileCoords(x=100, y=100, zoom=15)
|
||||||
|
grid = satellite_manager.get_tile_grid(center, 9)
|
||||||
|
assert len(grid) == 9
|
||||||
|
|
||||||
|
# Ensure center is in grid
|
||||||
|
assert any(c.x == 100 and c.y == 100 for c in grid)
|
||||||
|
|
||||||
|
# Test expansion 9 -> 25
|
||||||
|
new_tiles = satellite_manager.expand_search_grid(center, 9, 25)
|
||||||
|
assert len(new_tiles) == 16 # 25 - 9
|
||||||
Reference in New Issue
Block a user