mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 12:21:14 +00:00
c5ffc14fe9
Adds an opt-in C5-internal orthorectifier (`_orthorectifier.py`) that emits at most one tile-aligned JPEG candidate per nav frame to the C6 `TileStore.write_tile` API. Quality gates fire before any OpenCV work: covariance Frobenius, inlier floor, source-label (`SATELLITE_ANCHORED` only), and once-per-frame rate limit. Cross-component import rule (AZ-507) is preserved: c5_state never imports c6_tile_cache. `runtime_root.state_factory` carries a new `_C6MidFlightIngestAdapter` that builds the canonical `TileMetadata` (`ONBOARD_INGEST` / `FRESH` / `PENDING`), hashes the JPEG, and translates `FreshnessRejectionError` to a `None` return so the orthorectifier silently swallows freshness rejection per AC-NEW-3. Wiring is opt-in via `C5StateConfig.orthorectifier.enabled`; existing tests/binaries default to disabled and are unaffected. Both `GtsamIsam2StateEstimator` and `EskfStateEstimator` participate through new `attach_orthorectifier` / `set_latest_nav_frame` extension methods (Protocol surface unchanged). Tests: 22 new unit tests cover AC-1..AC-9 plus inlier-floor gate plus the composition-root adapter. 216/216 c5_state and 38/38 runtime-root + compose tests pass. Co-authored-by: Cursor <cursoragent@cursor.com>
716 lines
23 KiB
Python
716 lines
23 KiB
Python
"""AZ-389 — C5 Orthorectifier → C6 mid-flight tile candidate emission.
|
|
|
|
Covers AC-1..AC-9 from
|
|
``_docs/02_tasks/todo/AZ-389_c5_orthorectifier_c6.md``.
|
|
|
|
Tests instantiate :class:`Orthorectifier` directly with a fake
|
|
:class:`MidFlightTileWriter`, then drive synthetic
|
|
``(NavCameraFrame, EstimatorOutput, cov_6x6, ...)`` tuples through
|
|
:meth:`Orthorectifier.try_emit_candidate`. The composition-root
|
|
adapter (``_C6MidFlightIngestAdapter``) is exercised separately
|
|
against fake C6 protocols.
|
|
|
|
Quality-gate / silent-return / freshness-rejection branches are
|
|
asserted via call-count checks against the fake writer; the
|
|
homography-geometry branch (AC-1) verifies that the ground-plane
|
|
homography projects a synthetic ground point to the expected image
|
|
pixel within 1 px (the "1-pixel tolerance" gate).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
from unittest import mock
|
|
from uuid import uuid4
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from gps_denied_onboard._types.calibration import CameraCalibration
|
|
from gps_denied_onboard._types.geo import LatLonAlt
|
|
from gps_denied_onboard._types.nav import NavCameraFrame
|
|
from gps_denied_onboard._types.state import (
|
|
EstimatorOutput,
|
|
PoseSourceLabel,
|
|
Quat,
|
|
)
|
|
from gps_denied_onboard.components.c5_state._orthorectifier import (
|
|
MidFlightTileWriter,
|
|
Orthorectifier,
|
|
OrthorectifierThresholds,
|
|
_compose_tile_to_image_homography,
|
|
_ground_plane_homography,
|
|
_invert_se3,
|
|
_quat_to_rotation_matrix,
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Fixtures + helpers.
|
|
|
|
|
|
_FLIGHT_ID = "flt-test-001"
|
|
_COMPANION_ID = "comp-test-001"
|
|
_ESTIMATOR_LABEL = "gtsam_isam2"
|
|
|
|
|
|
def _make_thresholds(
|
|
*,
|
|
cov_norm_threshold: float = 1.0,
|
|
inlier_floor: int = 30,
|
|
tile_size_meters: float = 100.0,
|
|
tile_size_pixels: int = 64,
|
|
zoom_level: int = 18,
|
|
jpeg_quality: int = 90,
|
|
) -> OrthorectifierThresholds:
|
|
return OrthorectifierThresholds(
|
|
cov_norm_threshold=cov_norm_threshold,
|
|
inlier_floor=inlier_floor,
|
|
tile_size_meters=tile_size_meters,
|
|
tile_size_pixels=tile_size_pixels,
|
|
zoom_level=zoom_level,
|
|
jpeg_quality=jpeg_quality,
|
|
)
|
|
|
|
|
|
def _nadir_calibration() -> CameraCalibration:
|
|
"""Nadir-looking camera, 500-pixel focal length, centred principal point."""
|
|
K = np.array(
|
|
[
|
|
[500.0, 0.0, 200.0],
|
|
[0.0, 500.0, 200.0],
|
|
[0.0, 0.0, 1.0],
|
|
],
|
|
dtype=np.float64,
|
|
)
|
|
# body_T_cam — camera +Z aligned with body +Z (i.e. body coordinate
|
|
# frame is camera coordinate frame). The orthorectifier will then
|
|
# use the pose's world_T_body directly as world_T_cam, which is the
|
|
# simplest geometric setup for a unit test.
|
|
body_T_cam = np.eye(4, dtype=np.float64)
|
|
return CameraCalibration(
|
|
camera_id="test_cam",
|
|
intrinsics_3x3=K,
|
|
distortion=np.zeros(5, dtype=np.float64),
|
|
body_to_camera_se3=body_T_cam,
|
|
acquisition_method="synthetic",
|
|
)
|
|
|
|
|
|
def _make_image(size: int = 400, dtype: Any = np.uint8) -> np.ndarray:
|
|
"""Synthetic 400x400 grayscale image — a single bright square at the centre."""
|
|
img = np.zeros((size, size), dtype=dtype)
|
|
cs = size // 2
|
|
img[cs - 20 : cs + 20, cs - 20 : cs + 20] = 255
|
|
return img
|
|
|
|
|
|
def _frame(frame_id: int = 1) -> NavCameraFrame:
|
|
return NavCameraFrame(
|
|
frame_id=frame_id,
|
|
timestamp=datetime(2026, 5, 16, 12, 0, 0, tzinfo=timezone.utc),
|
|
image=_make_image(),
|
|
camera_calibration_id="test_cam",
|
|
)
|
|
|
|
|
|
def _camera_looking_down_quat() -> Quat:
|
|
"""Quaternion for a body-frame camera whose +Z axis points along world -Z (nadir).
|
|
|
|
Built as a 180° rotation about world +X. This flips body +Z to
|
|
world -Z (so a point on the ground at world +Z = -altitude appears
|
|
in front of the camera) and body +Y to world -Y; the X axis is
|
|
preserved. The standard pinhole projection then maps ground (X, Y)
|
|
to pixel (cx + fx*X/h, cy - fy*Y/h) where ``h`` is altitude — the
|
|
expected nadir homography for unit tests.
|
|
"""
|
|
angle = np.pi
|
|
axis = np.array([1.0, 0.0, 0.0])
|
|
# q = (cos(theta/2), axis * sin(theta/2))
|
|
half = angle / 2.0
|
|
return Quat(
|
|
w=float(np.cos(half)),
|
|
x=float(axis[0] * np.sin(half)),
|
|
y=float(axis[1] * np.sin(half)),
|
|
z=float(axis[2] * np.sin(half)),
|
|
)
|
|
|
|
|
|
def _output(
|
|
*,
|
|
lat: float = 50.45,
|
|
lon: float = 30.52,
|
|
alt_m: float = 100.0,
|
|
cov: np.ndarray | None = None,
|
|
source_label: PoseSourceLabel = PoseSourceLabel.SATELLITE_ANCHORED,
|
|
quat: Quat | None = None,
|
|
) -> EstimatorOutput:
|
|
if cov is None:
|
|
cov = np.eye(6, dtype=np.float64) * 0.01
|
|
if quat is None:
|
|
quat = _camera_looking_down_quat()
|
|
return EstimatorOutput(
|
|
frame_id=uuid4(),
|
|
position_wgs84=LatLonAlt(lat_deg=lat, lon_deg=lon, alt_m=alt_m),
|
|
orientation_world_T_body=quat,
|
|
velocity_world_mps=(0.0, 0.0, 0.0),
|
|
covariance_6x6=cov,
|
|
source_label=source_label,
|
|
last_satellite_anchor_age_ms=50,
|
|
smoothed=False,
|
|
emitted_at=1_000_000_000,
|
|
)
|
|
|
|
|
|
class _RecordingWriter:
|
|
"""Fake :class:`MidFlightTileWriter` — records every call.
|
|
|
|
By default returns ``(zoom, lat, lon)`` from the supplied input
|
|
(i.e. simulates a successful insert). Configure ``return_value`` to
|
|
a literal value (or ``None`` to simulate the C6 freshness gate
|
|
rejecting the insert) to override per-test.
|
|
"""
|
|
|
|
def __init__(self, *, return_override: Any = ..., side_effect: Any = None) -> None:
|
|
self.calls: list[dict[str, Any]] = []
|
|
self._return_override = return_override
|
|
self._side_effect = side_effect
|
|
|
|
def write_mid_flight_tile(
|
|
self,
|
|
*,
|
|
jpeg_bytes: bytes,
|
|
zoom_level: int,
|
|
lat: float,
|
|
lon: float,
|
|
tile_size_meters: float,
|
|
tile_size_pixels: int,
|
|
capture_timestamp: datetime,
|
|
flight_id: str,
|
|
companion_id: str,
|
|
estimator_label: str,
|
|
covariance_2x2_horizontal: tuple[
|
|
tuple[float, float], tuple[float, float]
|
|
],
|
|
last_anchor_age_ms: int,
|
|
mre_px: float,
|
|
imu_bias_norm: float,
|
|
) -> tuple[int, float, float] | None:
|
|
kv = {
|
|
"jpeg_bytes": jpeg_bytes,
|
|
"zoom_level": zoom_level,
|
|
"lat": lat,
|
|
"lon": lon,
|
|
"tile_size_meters": tile_size_meters,
|
|
"tile_size_pixels": tile_size_pixels,
|
|
"capture_timestamp": capture_timestamp,
|
|
"flight_id": flight_id,
|
|
"companion_id": companion_id,
|
|
"estimator_label": estimator_label,
|
|
"covariance_2x2_horizontal": covariance_2x2_horizontal,
|
|
"last_anchor_age_ms": last_anchor_age_ms,
|
|
"mre_px": mre_px,
|
|
"imu_bias_norm": imu_bias_norm,
|
|
}
|
|
self.calls.append(kv)
|
|
if self._side_effect is not None:
|
|
raise self._side_effect
|
|
if self._return_override is ...:
|
|
return (zoom_level, lat, lon)
|
|
return self._return_override
|
|
|
|
|
|
def _make_orthorectifier(
|
|
*,
|
|
writer: MidFlightTileWriter | None = None,
|
|
thresholds: OrthorectifierThresholds | None = None,
|
|
calibration: CameraCalibration | None = None,
|
|
) -> tuple[Orthorectifier, _RecordingWriter]:
|
|
if writer is None:
|
|
writer = _RecordingWriter()
|
|
return (
|
|
Orthorectifier(
|
|
tile_writer=writer,
|
|
thresholds=thresholds or _make_thresholds(),
|
|
camera_calibration=calibration or _nadir_calibration(),
|
|
flight_id=_FLIGHT_ID,
|
|
companion_id=_COMPANION_ID,
|
|
),
|
|
writer, # type: ignore[return-value]
|
|
)
|
|
|
|
|
|
def _emit_kwargs(
|
|
*,
|
|
frame: NavCameraFrame | None = None,
|
|
output: EstimatorOutput | None = None,
|
|
cov_6x6: np.ndarray | None = None,
|
|
inlier_count: int = 60,
|
|
mre_px: float = 1.5,
|
|
source_label: PoseSourceLabel | None = None,
|
|
last_anchor_age_ms: int = 50,
|
|
imu_bias_norm: float = 0.01,
|
|
estimator_label: str = _ESTIMATOR_LABEL,
|
|
) -> dict[str, Any]:
|
|
out = output if output is not None else _output()
|
|
return {
|
|
"frame": frame if frame is not None else _frame(),
|
|
"pose_estimate": out,
|
|
"cov_6x6": cov_6x6 if cov_6x6 is not None else out.covariance_6x6,
|
|
"inlier_count": inlier_count,
|
|
"mre_px": mre_px,
|
|
"source_label": source_label or out.source_label,
|
|
"last_anchor_age_ms": last_anchor_age_ms,
|
|
"imu_bias_norm": imu_bias_norm,
|
|
"estimator_label": estimator_label,
|
|
}
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-2 — covariance-norm gate.
|
|
|
|
|
|
def test_ac2_cov_norm_above_threshold_blocks_emission() -> None:
|
|
# Arrange
|
|
ortho, writer = _make_orthorectifier(
|
|
thresholds=_make_thresholds(cov_norm_threshold=1.0)
|
|
)
|
|
big_cov = np.eye(6, dtype=np.float64) * 10.0
|
|
out = _output(cov=big_cov)
|
|
|
|
# Act
|
|
result = ortho.try_emit_candidate(**_emit_kwargs(output=out, cov_6x6=big_cov))
|
|
|
|
# Assert
|
|
assert result is None
|
|
assert writer.calls == []
|
|
|
|
|
|
def test_ac2_cov_norm_below_threshold_emits() -> None:
|
|
# Arrange
|
|
ortho, writer = _make_orthorectifier(
|
|
thresholds=_make_thresholds(cov_norm_threshold=10.0)
|
|
)
|
|
|
|
# Act
|
|
result = ortho.try_emit_candidate(**_emit_kwargs())
|
|
|
|
# Assert
|
|
assert result is not None
|
|
assert len(writer.calls) == 1
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-3 — source-label gate.
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"label",
|
|
[PoseSourceLabel.VISUAL_PROPAGATED, PoseSourceLabel.DEAD_RECKONED],
|
|
)
|
|
def test_ac3_non_satellite_anchored_blocked(label: PoseSourceLabel) -> None:
|
|
# Arrange
|
|
ortho, writer = _make_orthorectifier()
|
|
out = _output(source_label=label)
|
|
|
|
# Act
|
|
result = ortho.try_emit_candidate(
|
|
**_emit_kwargs(output=out, source_label=label)
|
|
)
|
|
|
|
# Assert
|
|
assert result is None
|
|
assert writer.calls == []
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-4 — once-per-frame rate limit.
|
|
|
|
|
|
def test_ac4_same_frame_id_processed_only_once() -> None:
|
|
# Arrange
|
|
ortho, writer = _make_orthorectifier()
|
|
frame = _frame(frame_id=42)
|
|
|
|
# Act
|
|
first = ortho.try_emit_candidate(**_emit_kwargs(frame=frame))
|
|
second = ortho.try_emit_candidate(**_emit_kwargs(frame=frame))
|
|
|
|
# Assert
|
|
assert first is not None
|
|
assert second is None
|
|
assert len(writer.calls) == 1
|
|
|
|
|
|
def test_ac4_distinct_frame_ids_each_emit() -> None:
|
|
# Arrange
|
|
ortho, writer = _make_orthorectifier()
|
|
|
|
# Act
|
|
ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=1)))
|
|
ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=2)))
|
|
|
|
# Assert
|
|
assert len(writer.calls) == 2
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-8 — defensive missing-input skip.
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"missing",
|
|
["frame", "pose_estimate", "cov_6x6"],
|
|
)
|
|
def test_ac8_missing_inputs_silent_return_none(missing: str) -> None:
|
|
# Arrange
|
|
ortho, writer = _make_orthorectifier()
|
|
kwargs = _emit_kwargs()
|
|
kwargs[missing] = None
|
|
|
|
# Act
|
|
result = ortho.try_emit_candidate(**kwargs)
|
|
|
|
# Assert
|
|
assert result is None
|
|
assert writer.calls == []
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-5 / AC-6 — writer is called with the right metadata + SHA256.
|
|
|
|
|
|
def test_ac5_writer_called_with_onboard_ingest_metadata() -> None:
|
|
# Arrange
|
|
ortho, writer = _make_orthorectifier()
|
|
out = _output(lat=50.45, lon=30.52, alt_m=100.0)
|
|
|
|
# Act
|
|
ortho.try_emit_candidate(**_emit_kwargs(output=out))
|
|
|
|
# Assert — writer received the canonical onboard-ingest payload.
|
|
assert len(writer.calls) == 1
|
|
call = writer.calls[0]
|
|
assert call["zoom_level"] == 18
|
|
assert call["lat"] == pytest.approx(50.45)
|
|
assert call["lon"] == pytest.approx(30.52)
|
|
assert call["tile_size_meters"] == pytest.approx(100.0)
|
|
assert call["tile_size_pixels"] == 64
|
|
assert call["flight_id"] == _FLIGHT_ID
|
|
assert call["companion_id"] == _COMPANION_ID
|
|
assert call["estimator_label"] == _ESTIMATOR_LABEL
|
|
assert call["last_anchor_age_ms"] == 50
|
|
assert call["mre_px"] == pytest.approx(1.5)
|
|
assert call["imu_bias_norm"] == pytest.approx(0.01)
|
|
cov2x2 = call["covariance_2x2_horizontal"]
|
|
assert cov2x2[0][0] == pytest.approx(0.01)
|
|
assert cov2x2[1][1] == pytest.approx(0.01)
|
|
|
|
|
|
def test_ac6_jpeg_bytes_decode_to_expected_shape_and_quality() -> None:
|
|
# Arrange
|
|
ortho, writer = _make_orthorectifier(
|
|
thresholds=_make_thresholds(tile_size_pixels=128, jpeg_quality=80)
|
|
)
|
|
|
|
# Act
|
|
ortho.try_emit_candidate(**_emit_kwargs())
|
|
|
|
# Assert — emitted blob is a JPEG that decodes to the configured tile size.
|
|
assert len(writer.calls) == 1
|
|
jpeg_bytes = writer.calls[0]["jpeg_bytes"]
|
|
assert isinstance(jpeg_bytes, bytes)
|
|
assert len(jpeg_bytes) > 0
|
|
assert jpeg_bytes[:3] == b"\xff\xd8\xff" # JPEG SOI marker
|
|
decoded = cv2.imdecode(np.frombuffer(jpeg_bytes, dtype=np.uint8), cv2.IMREAD_UNCHANGED)
|
|
assert decoded is not None
|
|
assert decoded.shape[:2] == (128, 128)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-9 — writer-side freshness rejection swallowed.
|
|
|
|
|
|
def test_ac9_writer_returning_none_swallowed() -> None:
|
|
# Arrange
|
|
writer = _RecordingWriter(return_override=None)
|
|
ortho, _ = _make_orthorectifier(writer=writer)
|
|
|
|
# Act
|
|
result = ortho.try_emit_candidate(**_emit_kwargs())
|
|
|
|
# Assert
|
|
assert result is None
|
|
assert len(writer.calls) == 1 # the writer WAS called
|
|
|
|
|
|
def test_ac9_writer_raising_swallowed_with_warning(caplog: pytest.LogCaptureFixture) -> None:
|
|
# Arrange
|
|
writer = _RecordingWriter(side_effect=RuntimeError("infra blew up"))
|
|
ortho, _ = _make_orthorectifier(writer=writer)
|
|
|
|
# Act
|
|
with caplog.at_level(logging.WARNING, logger="gps_denied_onboard.c5_state.orthorectifier"):
|
|
result = ortho.try_emit_candidate(**_emit_kwargs())
|
|
|
|
# Assert
|
|
assert result is None
|
|
assert any(
|
|
rec.message == "c5.state.orthorectifier_writer_failed" for rec in caplog.records
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-7 — first-emission INFO log.
|
|
|
|
|
|
def test_ac7_first_emission_logs_info_subsequent_logs_debug(
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
# Arrange
|
|
ortho, _ = _make_orthorectifier()
|
|
|
|
# Act
|
|
with caplog.at_level(logging.INFO, logger="gps_denied_onboard.c5_state.orthorectifier"):
|
|
ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=1)))
|
|
ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=2)))
|
|
|
|
# Assert
|
|
info_records = [r for r in caplog.records if r.levelno == logging.INFO]
|
|
assert any(r.message == "c5.state.first_mid_flight_candidate" for r in info_records)
|
|
assert sum(r.message == "c5.state.first_mid_flight_candidate" for r in info_records) == 1
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Inlier-floor gate (specified alongside AC-2 in the task body).
|
|
|
|
|
|
def test_inlier_floor_blocks_emission() -> None:
|
|
# Arrange
|
|
ortho, writer = _make_orthorectifier(
|
|
thresholds=_make_thresholds(inlier_floor=50)
|
|
)
|
|
|
|
# Act — inlier_count == floor is rejected (strict inequality).
|
|
result = ortho.try_emit_candidate(**_emit_kwargs(inlier_count=50))
|
|
|
|
# Assert
|
|
assert result is None
|
|
assert writer.calls == []
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-1 — orthorectification correctness (geometry).
|
|
|
|
|
|
def test_ac1_homography_projects_origin_to_principal_point() -> None:
|
|
"""Camera straight down at altitude h, K = [fx 0 cx; 0 fy cy; 0 0 1].
|
|
|
|
Ground point (X=0, Y=0) lies under the camera. With the body-
|
|
looking-down convention (180° about world +X), the
|
|
``world_T_cam = world_T_body`` for ``body_T_cam = I``:
|
|
|
|
* Camera position in world = (0, 0, h)
|
|
* Camera Z axis points along world -Z (hits ground at Z=0)
|
|
|
|
The expected projection of ground (0, 0, 0) is the principal
|
|
point ``(cx, cy)`` to within sub-pixel tolerance.
|
|
"""
|
|
# Arrange
|
|
cal = _nadir_calibration()
|
|
K = np.asarray(cal.intrinsics_3x3, dtype=np.float64)
|
|
body_T_cam = np.asarray(cal.body_to_camera_se3, dtype=np.float64)
|
|
out = _output(alt_m=100.0)
|
|
R = _quat_to_rotation_matrix(out.orientation_world_T_body)
|
|
world_T_body = np.eye(4, dtype=np.float64)
|
|
world_T_body[:3, :3] = R
|
|
world_T_body[2, 3] = float(out.position_wgs84.alt_m)
|
|
world_T_cam = world_T_body @ body_T_cam
|
|
cam_T_world = _invert_se3(world_T_cam)
|
|
H = _ground_plane_homography(K=K, cam_T_world=cam_T_world)
|
|
|
|
# Act — project ground origin (X=0, Y=0, 1).
|
|
homog = H @ np.array([0.0, 0.0, 1.0])
|
|
pix = homog[:2] / homog[2]
|
|
|
|
# Assert
|
|
assert pix == pytest.approx(np.array([200.0, 200.0]), abs=1.0)
|
|
|
|
|
|
def test_ac1_homography_projects_offset_within_one_pixel() -> None:
|
|
"""Ground (X=10 m, Y=0) at altitude 100 m with fx=500 should land at cx + 50."""
|
|
# Arrange
|
|
cal = _nadir_calibration()
|
|
K = np.asarray(cal.intrinsics_3x3, dtype=np.float64)
|
|
body_T_cam = np.asarray(cal.body_to_camera_se3, dtype=np.float64)
|
|
out = _output(alt_m=100.0)
|
|
R = _quat_to_rotation_matrix(out.orientation_world_T_body)
|
|
world_T_body = np.eye(4, dtype=np.float64)
|
|
world_T_body[:3, :3] = R
|
|
world_T_body[2, 3] = float(out.position_wgs84.alt_m)
|
|
world_T_cam = world_T_body @ body_T_cam
|
|
cam_T_world = _invert_se3(world_T_cam)
|
|
H = _ground_plane_homography(K=K, cam_T_world=cam_T_world)
|
|
|
|
# Act
|
|
homog = H @ np.array([10.0, 0.0, 1.0])
|
|
pix = homog[:2] / homog[2]
|
|
|
|
# Assert — fx=500, h=100 → 10 m on the ground projects to 50 px from cx.
|
|
# The 180° flip about world +X sends body +Y to world -Y so a +X ground
|
|
# offset stays at +X in image space (cx + 50) on the row of the
|
|
# principal point.
|
|
assert pix[0] == pytest.approx(250.0, abs=1.0)
|
|
assert pix[1] == pytest.approx(200.0, abs=1.0)
|
|
|
|
|
|
def test_compose_tile_to_image_homography_is_centred_on_camera() -> None:
|
|
"""Tile pixel ``(N/2, N/2)`` corresponds to the camera nadir on the ground."""
|
|
# Arrange
|
|
cal = _nadir_calibration()
|
|
K = np.asarray(cal.intrinsics_3x3, dtype=np.float64)
|
|
body_T_cam = np.asarray(cal.body_to_camera_se3, dtype=np.float64)
|
|
out = _output(alt_m=100.0)
|
|
R = _quat_to_rotation_matrix(out.orientation_world_T_body)
|
|
world_T_body = np.eye(4, dtype=np.float64)
|
|
world_T_body[:3, :3] = R
|
|
world_T_body[2, 3] = float(out.position_wgs84.alt_m)
|
|
world_T_cam = world_T_body @ body_T_cam
|
|
cam_T_world = _invert_se3(world_T_cam)
|
|
H_g2i = _ground_plane_homography(K=K, cam_T_world=cam_T_world)
|
|
N = 64
|
|
H_t2i = _compose_tile_to_image_homography(
|
|
H_ground_to_image=H_g2i,
|
|
ground_x_center=float(world_T_cam[0, 3]),
|
|
ground_y_center=float(world_T_cam[1, 3]),
|
|
tile_size_meters=100.0,
|
|
tile_size_pixels=N,
|
|
)
|
|
|
|
# Act — middle of the tile maps to the principal point of the camera image.
|
|
homog = H_t2i @ np.array([N / 2.0, N / 2.0, 1.0])
|
|
pix = homog[:2] / homog[2]
|
|
|
|
# Assert
|
|
assert pix == pytest.approx(np.array([200.0, 200.0]), abs=1.0)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Composition-root adapter — _C6MidFlightIngestAdapter.
|
|
|
|
|
|
def test_adapter_translates_freshness_rejection_to_none() -> None:
|
|
"""The adapter catches :class:`FreshnessRejectionError` and returns ``None`` so the orthorectifier silently drops the candidate."""
|
|
# Arrange
|
|
from gps_denied_onboard.components.c6_tile_cache.errors import (
|
|
FreshnessRejectionError,
|
|
)
|
|
from gps_denied_onboard.runtime_root.state_factory import (
|
|
_C6MidFlightIngestAdapter,
|
|
)
|
|
|
|
fake_store = mock.MagicMock()
|
|
fake_store.write_tile.side_effect = FreshnessRejectionError(
|
|
"rear_sector stale"
|
|
)
|
|
adapter = _C6MidFlightIngestAdapter(tile_store=fake_store)
|
|
|
|
# Act
|
|
result = adapter.write_mid_flight_tile(
|
|
jpeg_bytes=b"\xff\xd8\xff\xe0fakejpeg",
|
|
zoom_level=18,
|
|
lat=50.45,
|
|
lon=30.52,
|
|
tile_size_meters=100.0,
|
|
tile_size_pixels=64,
|
|
capture_timestamp=datetime(2026, 5, 16, 12, 0, 0, tzinfo=timezone.utc),
|
|
flight_id=_FLIGHT_ID,
|
|
companion_id=_COMPANION_ID,
|
|
estimator_label=_ESTIMATOR_LABEL,
|
|
covariance_2x2_horizontal=((0.01, 0.0), (0.0, 0.01)),
|
|
last_anchor_age_ms=50,
|
|
mre_px=1.5,
|
|
imu_bias_norm=0.01,
|
|
)
|
|
|
|
# Assert
|
|
assert result is None
|
|
fake_store.write_tile.assert_called_once()
|
|
|
|
|
|
def test_adapter_calls_write_tile_with_onboard_ingest_metadata() -> None:
|
|
"""On a successful insert the adapter builds an ``ONBOARD_INGEST`` row, sets ``PENDING`` voting status, and computes the SHA256 of the JPEG."""
|
|
# Arrange
|
|
from gps_denied_onboard.components.c6_tile_cache._types import (
|
|
FreshnessLabel,
|
|
TileSource,
|
|
VotingStatus,
|
|
)
|
|
from gps_denied_onboard.runtime_root.state_factory import (
|
|
_C6MidFlightIngestAdapter,
|
|
)
|
|
|
|
fake_store = mock.MagicMock()
|
|
adapter = _C6MidFlightIngestAdapter(tile_store=fake_store)
|
|
jpeg_bytes = b"\xff\xd8\xff\xe0testjpegblob"
|
|
|
|
# Act
|
|
result = adapter.write_mid_flight_tile(
|
|
jpeg_bytes=jpeg_bytes,
|
|
zoom_level=18,
|
|
lat=50.45,
|
|
lon=30.52,
|
|
tile_size_meters=100.0,
|
|
tile_size_pixels=64,
|
|
capture_timestamp=datetime(2026, 5, 16, 12, 0, 0, tzinfo=timezone.utc),
|
|
flight_id=_FLIGHT_ID,
|
|
companion_id=_COMPANION_ID,
|
|
estimator_label=_ESTIMATOR_LABEL,
|
|
covariance_2x2_horizontal=((0.01, 0.0), (0.0, 0.01)),
|
|
last_anchor_age_ms=50,
|
|
mre_px=1.5,
|
|
imu_bias_norm=0.01,
|
|
)
|
|
|
|
# Assert — write_tile invoked with (blob, metadata).
|
|
fake_store.write_tile.assert_called_once()
|
|
args, kwargs = fake_store.write_tile.call_args
|
|
assert kwargs == {}
|
|
assert args[0] == jpeg_bytes
|
|
metadata = args[1]
|
|
assert metadata.source == TileSource.ONBOARD_INGEST
|
|
assert metadata.freshness_label == FreshnessLabel.FRESH
|
|
assert metadata.voting_status == VotingStatus.PENDING
|
|
assert metadata.flight_id == _FLIGHT_ID
|
|
assert metadata.companion_id == _COMPANION_ID
|
|
assert metadata.tile_id.zoom_level == 18
|
|
assert metadata.tile_id.lat == pytest.approx(50.45)
|
|
assert metadata.tile_id.lon == pytest.approx(30.52)
|
|
assert metadata.content_sha256_hex == hashlib.sha256(jpeg_bytes).hexdigest()
|
|
quality = metadata.quality_metadata
|
|
assert quality is not None
|
|
assert quality.estimator_label == _ESTIMATOR_LABEL
|
|
assert quality.covariance_2x2 == ((0.01, 0.0), (0.0, 0.01))
|
|
assert quality.last_anchor_age_ms == 50
|
|
assert result == (18, pytest.approx(50.45), pytest.approx(30.52))
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# OrthorectifierConfig validation (config block sanity).
|
|
|
|
|
|
def test_thresholds_reject_non_positive_cov_norm() -> None:
|
|
# Act / Assert
|
|
with pytest.raises(ValueError):
|
|
OrthorectifierThresholds(cov_norm_threshold=0.0, inlier_floor=10)
|
|
|
|
|
|
def test_thresholds_reject_negative_inlier_floor() -> None:
|
|
# Act / Assert
|
|
with pytest.raises(ValueError):
|
|
OrthorectifierThresholds(cov_norm_threshold=1.0, inlier_floor=-1)
|