"""AZ-389 — C5 Orthorectifier → C6 mid-flight tile candidate emission. Covers AC-1..AC-9 from ``_docs/02_tasks/todo/AZ-389_c5_orthorectifier_c6.md``. Tests instantiate :class:`Orthorectifier` directly with a fake :class:`MidFlightTileWriter`, then drive synthetic ``(NavCameraFrame, EstimatorOutput, cov_6x6, ...)`` tuples through :meth:`Orthorectifier.try_emit_candidate`. The composition-root adapter (``_C6MidFlightIngestAdapter``) is exercised separately against fake C6 protocols. Quality-gate / silent-return / freshness-rejection branches are asserted via call-count checks against the fake writer; the homography-geometry branch (AC-1) verifies that the ground-plane homography projects a synthetic ground point to the expected image pixel within 1 px (the "1-pixel tolerance" gate). """ from __future__ import annotations import hashlib import logging from datetime import datetime, timezone from typing import Any from unittest import mock from uuid import uuid4 import cv2 import numpy as np import pytest from gps_denied_onboard._types.calibration import CameraCalibration from gps_denied_onboard._types.geo import LatLonAlt from gps_denied_onboard._types.nav import NavCameraFrame from gps_denied_onboard._types.state import ( EstimatorOutput, PoseSourceLabel, Quat, ) from gps_denied_onboard.components.c5_state._orthorectifier import ( MidFlightTileWriter, Orthorectifier, OrthorectifierThresholds, _compose_tile_to_image_homography, _ground_plane_homography, _invert_se3, _quat_to_rotation_matrix, ) # ---------------------------------------------------------------------- # Fixtures + helpers. _FLIGHT_ID = "flt-test-001" _COMPANION_ID = "comp-test-001" _ESTIMATOR_LABEL = "gtsam_isam2" def _make_thresholds( *, cov_norm_threshold: float = 1.0, inlier_floor: int = 30, tile_size_meters: float = 100.0, tile_size_pixels: int = 64, zoom_level: int = 18, jpeg_quality: int = 90, ) -> OrthorectifierThresholds: return OrthorectifierThresholds( cov_norm_threshold=cov_norm_threshold, inlier_floor=inlier_floor, tile_size_meters=tile_size_meters, tile_size_pixels=tile_size_pixels, zoom_level=zoom_level, jpeg_quality=jpeg_quality, ) def _nadir_calibration() -> CameraCalibration: """Nadir-looking camera, 500-pixel focal length, centred principal point.""" K = np.array( [ [500.0, 0.0, 200.0], [0.0, 500.0, 200.0], [0.0, 0.0, 1.0], ], dtype=np.float64, ) # body_T_cam — camera +Z aligned with body +Z (i.e. body coordinate # frame is camera coordinate frame). The orthorectifier will then # use the pose's world_T_body directly as world_T_cam, which is the # simplest geometric setup for a unit test. body_T_cam = np.eye(4, dtype=np.float64) return CameraCalibration( camera_id="test_cam", intrinsics_3x3=K, distortion=np.zeros(5, dtype=np.float64), body_to_camera_se3=body_T_cam, acquisition_method="synthetic", ) def _make_image(size: int = 400, dtype: Any = np.uint8) -> np.ndarray: """Synthetic 400x400 grayscale image — a single bright square at the centre.""" img = np.zeros((size, size), dtype=dtype) cs = size // 2 img[cs - 20 : cs + 20, cs - 20 : cs + 20] = 255 return img def _frame(frame_id: int = 1) -> NavCameraFrame: return NavCameraFrame( frame_id=frame_id, timestamp=datetime(2026, 5, 16, 12, 0, 0, tzinfo=timezone.utc), image=_make_image(), camera_calibration_id="test_cam", ) def _camera_looking_down_quat() -> Quat: """Quaternion for a body-frame camera whose +Z axis points along world -Z (nadir). Built as a 180° rotation about world +X. This flips body +Z to world -Z (so a point on the ground at world +Z = -altitude appears in front of the camera) and body +Y to world -Y; the X axis is preserved. The standard pinhole projection then maps ground (X, Y) to pixel (cx + fx*X/h, cy - fy*Y/h) where ``h`` is altitude — the expected nadir homography for unit tests. """ angle = np.pi axis = np.array([1.0, 0.0, 0.0]) # q = (cos(theta/2), axis * sin(theta/2)) half = angle / 2.0 return Quat( w=float(np.cos(half)), x=float(axis[0] * np.sin(half)), y=float(axis[1] * np.sin(half)), z=float(axis[2] * np.sin(half)), ) def _output( *, lat: float = 50.45, lon: float = 30.52, alt_m: float = 100.0, cov: np.ndarray | None = None, source_label: PoseSourceLabel = PoseSourceLabel.SATELLITE_ANCHORED, quat: Quat | None = None, ) -> EstimatorOutput: if cov is None: cov = np.eye(6, dtype=np.float64) * 0.01 if quat is None: quat = _camera_looking_down_quat() return EstimatorOutput( frame_id=uuid4(), position_wgs84=LatLonAlt(lat_deg=lat, lon_deg=lon, alt_m=alt_m), orientation_world_T_body=quat, velocity_world_mps=(0.0, 0.0, 0.0), covariance_6x6=cov, source_label=source_label, last_satellite_anchor_age_ms=50, smoothed=False, emitted_at=1_000_000_000, ) class _RecordingWriter: """Fake :class:`MidFlightTileWriter` — records every call. By default returns ``(zoom, lat, lon)`` from the supplied input (i.e. simulates a successful insert). Configure ``return_value`` to a literal value (or ``None`` to simulate the C6 freshness gate rejecting the insert) to override per-test. """ def __init__(self, *, return_override: Any = ..., side_effect: Any = None) -> None: self.calls: list[dict[str, Any]] = [] self._return_override = return_override self._side_effect = side_effect def write_mid_flight_tile( self, *, jpeg_bytes: bytes, zoom_level: int, lat: float, lon: float, tile_size_meters: float, tile_size_pixels: int, capture_timestamp: datetime, flight_id: str, companion_id: str, estimator_label: str, covariance_2x2_horizontal: tuple[ tuple[float, float], tuple[float, float] ], last_anchor_age_ms: int, mre_px: float, imu_bias_norm: float, ) -> tuple[int, float, float] | None: kv = { "jpeg_bytes": jpeg_bytes, "zoom_level": zoom_level, "lat": lat, "lon": lon, "tile_size_meters": tile_size_meters, "tile_size_pixels": tile_size_pixels, "capture_timestamp": capture_timestamp, "flight_id": flight_id, "companion_id": companion_id, "estimator_label": estimator_label, "covariance_2x2_horizontal": covariance_2x2_horizontal, "last_anchor_age_ms": last_anchor_age_ms, "mre_px": mre_px, "imu_bias_norm": imu_bias_norm, } self.calls.append(kv) if self._side_effect is not None: raise self._side_effect if self._return_override is ...: return (zoom_level, lat, lon) return self._return_override def _make_orthorectifier( *, writer: MidFlightTileWriter | None = None, thresholds: OrthorectifierThresholds | None = None, calibration: CameraCalibration | None = None, ) -> tuple[Orthorectifier, _RecordingWriter]: if writer is None: writer = _RecordingWriter() return ( Orthorectifier( tile_writer=writer, thresholds=thresholds or _make_thresholds(), camera_calibration=calibration or _nadir_calibration(), flight_id=_FLIGHT_ID, companion_id=_COMPANION_ID, ), writer, # type: ignore[return-value] ) def _emit_kwargs( *, frame: NavCameraFrame | None = None, output: EstimatorOutput | None = None, cov_6x6: np.ndarray | None = None, inlier_count: int = 60, mre_px: float = 1.5, source_label: PoseSourceLabel | None = None, last_anchor_age_ms: int = 50, imu_bias_norm: float = 0.01, estimator_label: str = _ESTIMATOR_LABEL, ) -> dict[str, Any]: out = output if output is not None else _output() return { "frame": frame if frame is not None else _frame(), "pose_estimate": out, "cov_6x6": cov_6x6 if cov_6x6 is not None else out.covariance_6x6, "inlier_count": inlier_count, "mre_px": mre_px, "source_label": source_label or out.source_label, "last_anchor_age_ms": last_anchor_age_ms, "imu_bias_norm": imu_bias_norm, "estimator_label": estimator_label, } # ---------------------------------------------------------------------- # AC-2 — covariance-norm gate. def test_ac2_cov_norm_above_threshold_blocks_emission() -> None: # Arrange ortho, writer = _make_orthorectifier( thresholds=_make_thresholds(cov_norm_threshold=1.0) ) big_cov = np.eye(6, dtype=np.float64) * 10.0 out = _output(cov=big_cov) # Act result = ortho.try_emit_candidate(**_emit_kwargs(output=out, cov_6x6=big_cov)) # Assert assert result is None assert writer.calls == [] def test_ac2_cov_norm_below_threshold_emits() -> None: # Arrange ortho, writer = _make_orthorectifier( thresholds=_make_thresholds(cov_norm_threshold=10.0) ) # Act result = ortho.try_emit_candidate(**_emit_kwargs()) # Assert assert result is not None assert len(writer.calls) == 1 # ---------------------------------------------------------------------- # AC-3 — source-label gate. @pytest.mark.parametrize( "label", [PoseSourceLabel.VISUAL_PROPAGATED, PoseSourceLabel.DEAD_RECKONED], ) def test_ac3_non_satellite_anchored_blocked(label: PoseSourceLabel) -> None: # Arrange ortho, writer = _make_orthorectifier() out = _output(source_label=label) # Act result = ortho.try_emit_candidate( **_emit_kwargs(output=out, source_label=label) ) # Assert assert result is None assert writer.calls == [] # ---------------------------------------------------------------------- # AC-4 — once-per-frame rate limit. def test_ac4_same_frame_id_processed_only_once() -> None: # Arrange ortho, writer = _make_orthorectifier() frame = _frame(frame_id=42) # Act first = ortho.try_emit_candidate(**_emit_kwargs(frame=frame)) second = ortho.try_emit_candidate(**_emit_kwargs(frame=frame)) # Assert assert first is not None assert second is None assert len(writer.calls) == 1 def test_ac4_distinct_frame_ids_each_emit() -> None: # Arrange ortho, writer = _make_orthorectifier() # Act ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=1))) ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=2))) # Assert assert len(writer.calls) == 2 # ---------------------------------------------------------------------- # AC-8 — defensive missing-input skip. @pytest.mark.parametrize( "missing", ["frame", "pose_estimate", "cov_6x6"], ) def test_ac8_missing_inputs_silent_return_none(missing: str) -> None: # Arrange ortho, writer = _make_orthorectifier() kwargs = _emit_kwargs() kwargs[missing] = None # Act result = ortho.try_emit_candidate(**kwargs) # Assert assert result is None assert writer.calls == [] # ---------------------------------------------------------------------- # AC-5 / AC-6 — writer is called with the right metadata + SHA256. def test_ac5_writer_called_with_onboard_ingest_metadata() -> None: # Arrange ortho, writer = _make_orthorectifier() out = _output(lat=50.45, lon=30.52, alt_m=100.0) # Act ortho.try_emit_candidate(**_emit_kwargs(output=out)) # Assert — writer received the canonical onboard-ingest payload. assert len(writer.calls) == 1 call = writer.calls[0] assert call["zoom_level"] == 18 assert call["lat"] == pytest.approx(50.45) assert call["lon"] == pytest.approx(30.52) assert call["tile_size_meters"] == pytest.approx(100.0) assert call["tile_size_pixels"] == 64 assert call["flight_id"] == _FLIGHT_ID assert call["companion_id"] == _COMPANION_ID assert call["estimator_label"] == _ESTIMATOR_LABEL assert call["last_anchor_age_ms"] == 50 assert call["mre_px"] == pytest.approx(1.5) assert call["imu_bias_norm"] == pytest.approx(0.01) cov2x2 = call["covariance_2x2_horizontal"] assert cov2x2[0][0] == pytest.approx(0.01) assert cov2x2[1][1] == pytest.approx(0.01) def test_ac6_jpeg_bytes_decode_to_expected_shape_and_quality() -> None: # Arrange ortho, writer = _make_orthorectifier( thresholds=_make_thresholds(tile_size_pixels=128, jpeg_quality=80) ) # Act ortho.try_emit_candidate(**_emit_kwargs()) # Assert — emitted blob is a JPEG that decodes to the configured tile size. assert len(writer.calls) == 1 jpeg_bytes = writer.calls[0]["jpeg_bytes"] assert isinstance(jpeg_bytes, bytes) assert len(jpeg_bytes) > 0 assert jpeg_bytes[:3] == b"\xff\xd8\xff" # JPEG SOI marker decoded = cv2.imdecode(np.frombuffer(jpeg_bytes, dtype=np.uint8), cv2.IMREAD_UNCHANGED) assert decoded is not None assert decoded.shape[:2] == (128, 128) # ---------------------------------------------------------------------- # AC-9 — writer-side freshness rejection swallowed. def test_ac9_writer_returning_none_swallowed() -> None: # Arrange writer = _RecordingWriter(return_override=None) ortho, _ = _make_orthorectifier(writer=writer) # Act result = ortho.try_emit_candidate(**_emit_kwargs()) # Assert assert result is None assert len(writer.calls) == 1 # the writer WAS called def test_ac9_writer_raising_swallowed_with_warning(caplog: pytest.LogCaptureFixture) -> None: # Arrange writer = _RecordingWriter(side_effect=RuntimeError("infra blew up")) ortho, _ = _make_orthorectifier(writer=writer) # Act with caplog.at_level(logging.WARNING, logger="gps_denied_onboard.c5_state.orthorectifier"): result = ortho.try_emit_candidate(**_emit_kwargs()) # Assert assert result is None assert any( rec.message == "c5.state.orthorectifier_writer_failed" for rec in caplog.records ) # ---------------------------------------------------------------------- # AC-7 — first-emission INFO log. def test_ac7_first_emission_logs_info_subsequent_logs_debug( caplog: pytest.LogCaptureFixture, ) -> None: # Arrange ortho, _ = _make_orthorectifier() # Act with caplog.at_level(logging.INFO, logger="gps_denied_onboard.c5_state.orthorectifier"): ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=1))) ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=2))) # Assert info_records = [r for r in caplog.records if r.levelno == logging.INFO] assert any(r.message == "c5.state.first_mid_flight_candidate" for r in info_records) assert sum(r.message == "c5.state.first_mid_flight_candidate" for r in info_records) == 1 # ---------------------------------------------------------------------- # Inlier-floor gate (specified alongside AC-2 in the task body). def test_inlier_floor_blocks_emission() -> None: # Arrange ortho, writer = _make_orthorectifier( thresholds=_make_thresholds(inlier_floor=50) ) # Act — inlier_count == floor is rejected (strict inequality). result = ortho.try_emit_candidate(**_emit_kwargs(inlier_count=50)) # Assert assert result is None assert writer.calls == [] # ---------------------------------------------------------------------- # AC-1 — orthorectification correctness (geometry). def test_ac1_homography_projects_origin_to_principal_point() -> None: """Camera straight down at altitude h, K = [fx 0 cx; 0 fy cy; 0 0 1]. Ground point (X=0, Y=0) lies under the camera. With the body- looking-down convention (180° about world +X), the ``world_T_cam = world_T_body`` for ``body_T_cam = I``: * Camera position in world = (0, 0, h) * Camera Z axis points along world -Z (hits ground at Z=0) The expected projection of ground (0, 0, 0) is the principal point ``(cx, cy)`` to within sub-pixel tolerance. """ # Arrange cal = _nadir_calibration() K = np.asarray(cal.intrinsics_3x3, dtype=np.float64) body_T_cam = np.asarray(cal.body_to_camera_se3, dtype=np.float64) out = _output(alt_m=100.0) R = _quat_to_rotation_matrix(out.orientation_world_T_body) world_T_body = np.eye(4, dtype=np.float64) world_T_body[:3, :3] = R world_T_body[2, 3] = float(out.position_wgs84.alt_m) world_T_cam = world_T_body @ body_T_cam cam_T_world = _invert_se3(world_T_cam) H = _ground_plane_homography(K=K, cam_T_world=cam_T_world) # Act — project ground origin (X=0, Y=0, 1). homog = H @ np.array([0.0, 0.0, 1.0]) pix = homog[:2] / homog[2] # Assert assert pix == pytest.approx(np.array([200.0, 200.0]), abs=1.0) def test_ac1_homography_projects_offset_within_one_pixel() -> None: """Ground (X=10 m, Y=0) at altitude 100 m with fx=500 should land at cx + 50.""" # Arrange cal = _nadir_calibration() K = np.asarray(cal.intrinsics_3x3, dtype=np.float64) body_T_cam = np.asarray(cal.body_to_camera_se3, dtype=np.float64) out = _output(alt_m=100.0) R = _quat_to_rotation_matrix(out.orientation_world_T_body) world_T_body = np.eye(4, dtype=np.float64) world_T_body[:3, :3] = R world_T_body[2, 3] = float(out.position_wgs84.alt_m) world_T_cam = world_T_body @ body_T_cam cam_T_world = _invert_se3(world_T_cam) H = _ground_plane_homography(K=K, cam_T_world=cam_T_world) # Act homog = H @ np.array([10.0, 0.0, 1.0]) pix = homog[:2] / homog[2] # Assert — fx=500, h=100 → 10 m on the ground projects to 50 px from cx. # The 180° flip about world +X sends body +Y to world -Y so a +X ground # offset stays at +X in image space (cx + 50) on the row of the # principal point. assert pix[0] == pytest.approx(250.0, abs=1.0) assert pix[1] == pytest.approx(200.0, abs=1.0) def test_compose_tile_to_image_homography_is_centred_on_camera() -> None: """Tile pixel ``(N/2, N/2)`` corresponds to the camera nadir on the ground.""" # Arrange cal = _nadir_calibration() K = np.asarray(cal.intrinsics_3x3, dtype=np.float64) body_T_cam = np.asarray(cal.body_to_camera_se3, dtype=np.float64) out = _output(alt_m=100.0) R = _quat_to_rotation_matrix(out.orientation_world_T_body) world_T_body = np.eye(4, dtype=np.float64) world_T_body[:3, :3] = R world_T_body[2, 3] = float(out.position_wgs84.alt_m) world_T_cam = world_T_body @ body_T_cam cam_T_world = _invert_se3(world_T_cam) H_g2i = _ground_plane_homography(K=K, cam_T_world=cam_T_world) N = 64 H_t2i = _compose_tile_to_image_homography( H_ground_to_image=H_g2i, ground_x_center=float(world_T_cam[0, 3]), ground_y_center=float(world_T_cam[1, 3]), tile_size_meters=100.0, tile_size_pixels=N, ) # Act — middle of the tile maps to the principal point of the camera image. homog = H_t2i @ np.array([N / 2.0, N / 2.0, 1.0]) pix = homog[:2] / homog[2] # Assert assert pix == pytest.approx(np.array([200.0, 200.0]), abs=1.0) # ---------------------------------------------------------------------- # Composition-root adapter — _C6MidFlightIngestAdapter. def test_adapter_translates_freshness_rejection_to_none() -> None: """The adapter catches :class:`FreshnessRejectionError` and returns ``None`` so the orthorectifier silently drops the candidate.""" # Arrange from gps_denied_onboard.components.c6_tile_cache.errors import ( FreshnessRejectionError, ) from gps_denied_onboard.runtime_root.state_factory import ( _C6MidFlightIngestAdapter, ) fake_store = mock.MagicMock() fake_store.write_tile.side_effect = FreshnessRejectionError( "rear_sector stale" ) adapter = _C6MidFlightIngestAdapter(tile_store=fake_store) # Act result = adapter.write_mid_flight_tile( jpeg_bytes=b"\xff\xd8\xff\xe0fakejpeg", zoom_level=18, lat=50.45, lon=30.52, tile_size_meters=100.0, tile_size_pixels=64, capture_timestamp=datetime(2026, 5, 16, 12, 0, 0, tzinfo=timezone.utc), flight_id=_FLIGHT_ID, companion_id=_COMPANION_ID, estimator_label=_ESTIMATOR_LABEL, covariance_2x2_horizontal=((0.01, 0.0), (0.0, 0.01)), last_anchor_age_ms=50, mre_px=1.5, imu_bias_norm=0.01, ) # Assert assert result is None fake_store.write_tile.assert_called_once() def test_adapter_calls_write_tile_with_onboard_ingest_metadata() -> None: """On a successful insert the adapter builds an ``ONBOARD_INGEST`` row, sets ``PENDING`` voting status, and computes the SHA256 of the JPEG.""" # Arrange from gps_denied_onboard.components.c6_tile_cache._types import ( FreshnessLabel, TileSource, VotingStatus, ) from gps_denied_onboard.runtime_root.state_factory import ( _C6MidFlightIngestAdapter, ) fake_store = mock.MagicMock() adapter = _C6MidFlightIngestAdapter(tile_store=fake_store) jpeg_bytes = b"\xff\xd8\xff\xe0testjpegblob" # Act result = adapter.write_mid_flight_tile( jpeg_bytes=jpeg_bytes, zoom_level=18, lat=50.45, lon=30.52, tile_size_meters=100.0, tile_size_pixels=64, capture_timestamp=datetime(2026, 5, 16, 12, 0, 0, tzinfo=timezone.utc), flight_id=_FLIGHT_ID, companion_id=_COMPANION_ID, estimator_label=_ESTIMATOR_LABEL, covariance_2x2_horizontal=((0.01, 0.0), (0.0, 0.01)), last_anchor_age_ms=50, mre_px=1.5, imu_bias_norm=0.01, ) # Assert — write_tile invoked with (blob, metadata). fake_store.write_tile.assert_called_once() args, kwargs = fake_store.write_tile.call_args assert kwargs == {} assert args[0] == jpeg_bytes metadata = args[1] assert metadata.source == TileSource.ONBOARD_INGEST assert metadata.freshness_label == FreshnessLabel.FRESH assert metadata.voting_status == VotingStatus.PENDING assert metadata.flight_id == _FLIGHT_ID assert metadata.companion_id == _COMPANION_ID assert metadata.tile_id.zoom_level == 18 assert metadata.tile_id.lat == pytest.approx(50.45) assert metadata.tile_id.lon == pytest.approx(30.52) assert metadata.content_sha256_hex == hashlib.sha256(jpeg_bytes).hexdigest() quality = metadata.quality_metadata assert quality is not None assert quality.estimator_label == _ESTIMATOR_LABEL assert quality.covariance_2x2 == ((0.01, 0.0), (0.0, 0.01)) assert quality.last_anchor_age_ms == 50 assert result == (18, pytest.approx(50.45), pytest.approx(30.52)) # ---------------------------------------------------------------------- # OrthorectifierConfig validation (config block sanity). def test_thresholds_reject_non_positive_cov_norm() -> None: # Act / Assert with pytest.raises(ValueError): OrthorectifierThresholds(cov_norm_threshold=0.0, inlier_floor=10) def test_thresholds_reject_negative_inlier_floor() -> None: # Act / Assert with pytest.raises(ValueError): OrthorectifierThresholds(cov_norm_threshold=1.0, inlier_floor=-1)