From c5ffc14fe9a209e9d940bcb0b213c5dcbd8c9b02 Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Sat, 16 May 2026 09:02:33 +0300 Subject: [PATCH] [AZ-389] C5 orthorectifier emits mid-flight tiles to C6 Adds an opt-in C5-internal orthorectifier (`_orthorectifier.py`) that emits at most one tile-aligned JPEG candidate per nav frame to the C6 `TileStore.write_tile` API. Quality gates fire before any OpenCV work: covariance Frobenius, inlier floor, source-label (`SATELLITE_ANCHORED` only), and once-per-frame rate limit. Cross-component import rule (AZ-507) is preserved: c5_state never imports c6_tile_cache. `runtime_root.state_factory` carries a new `_C6MidFlightIngestAdapter` that builds the canonical `TileMetadata` (`ONBOARD_INGEST` / `FRESH` / `PENDING`), hashes the JPEG, and translates `FreshnessRejectionError` to a `None` return so the orthorectifier silently swallows freshness rejection per AC-NEW-3. Wiring is opt-in via `C5StateConfig.orthorectifier.enabled`; existing tests/binaries default to disabled and are unaffected. Both `GtsamIsam2StateEstimator` and `EskfStateEstimator` participate through new `attach_orthorectifier` / `set_latest_nav_frame` extension methods (Protocol surface unchanged). Tests: 22 new unit tests cover AC-1..AC-9 plus inlier-floor gate plus the composition-root adapter. 216/216 c5_state and 38/38 runtime-root + compose tests pass. Co-authored-by: Cursor --- .../AZ-389_c5_orthorectifier_c6.md | 0 .../batch_65_cycle1_report.md | 146 ++++ _docs/_autodev_state.md | 10 +- .../components/c5_state/_orthorectifier.py | 598 +++++++++++++++ .../components/c5_state/config.py | 63 +- .../components/c5_state/eskf_baseline.py | 131 +++- .../c5_state/gtsam_isam2_estimator.py | 143 +++- .../runtime_root/state_factory.py | 166 +++- .../c5_state/test_az389_orthorectifier.py | 715 ++++++++++++++++++ 9 files changed, 1952 insertions(+), 20 deletions(-) rename _docs/02_tasks/{todo => done}/AZ-389_c5_orthorectifier_c6.md (100%) create mode 100644 _docs/03_implementation/batch_65_cycle1_report.md create mode 100644 src/gps_denied_onboard/components/c5_state/_orthorectifier.py create mode 100644 tests/unit/c5_state/test_az389_orthorectifier.py diff --git a/_docs/02_tasks/todo/AZ-389_c5_orthorectifier_c6.md b/_docs/02_tasks/done/AZ-389_c5_orthorectifier_c6.md similarity index 100% rename from _docs/02_tasks/todo/AZ-389_c5_orthorectifier_c6.md rename to _docs/02_tasks/done/AZ-389_c5_orthorectifier_c6.md diff --git a/_docs/03_implementation/batch_65_cycle1_report.md b/_docs/03_implementation/batch_65_cycle1_report.md new file mode 100644 index 0000000..af11f2d --- /dev/null +++ b/_docs/03_implementation/batch_65_cycle1_report.md @@ -0,0 +1,146 @@ +# Batch 65 — Cycle 1 Report + +**Date**: 2026-05-16 +**Tasks**: AZ-389 (C5 orthorectifier → C6 mid-flight tile candidate emission) +**Verdict**: COMPLETE — PASS (self-reviewed) + +## Summary + +Closes the AZ-389 gap inside the C5 state estimator by introducing a +component-internal orthorectifier that emits at most one tile-aligned +JPEG candidate per nav-camera frame to C6 via the existing +`TileStore.write_tile` API. + +The implementation respects the AZ-507 cross-component import rule +(enforced by `test_az270_compose_root.test_ac6_only_compose_root_imports_concrete_strategies`): +c5_state never imports c6_tile_cache. The composition root's +`runtime_root.state_factory` carries a new +`_C6MidFlightIngestAdapter` that wraps the C6 `TileStore`, builds the +canonical `TileMetadata` (`TileSource.ONBOARD_INGEST`, +`FreshnessLabel.FRESH`, `VotingStatus.PENDING`), hashes the JPEG +bytes, and translates `FreshnessRejectionError` into a `None` return +so the orthorectifier silently swallows freshness rejection per +AC-NEW-3 (opportunistic emission). + +The orthorectifier runs entirely on the existing state-ingest thread +(Invariant 1) — no new threads, no additional locks. It is wired +opt-in: `config.components['c5_state'].orthorectifier.enabled = false` +keeps the legacy steady-state path bit-for-bit unchanged. Both +`GtsamIsam2StateEstimator` and `EskfStateEstimator` participate +through new `attach_orthorectifier(...)` and `set_latest_nav_frame(...)` +extension methods (concrete only — the `StateEstimator` Protocol +surface is unchanged so existing implementations and tests continue +to satisfy it). + +## Architecture decisions + +* **Per-frame, per-estimator hook** — the hook fires after the + EstimatorOutput is built inside `current_estimate()`. The buffered + nav frame supplies the source pixels; the orthorectifier passes + duck-typed pose + cov to its kernel and rate-limits itself to one + tile per `frame.frame_id` (AC-4). +* **No new C6 API** — uses `TileStore.write_tile(blob, metadata)`, + the same atomic file + metadata insert that the C11 download path + already uses. The composition-root adapter is the only new + component-bridge. +* **Quality gates as cheap pre-checks** — covariance Frobenius gate, + inlier-floor gate, source-label gate (only `SATELLITE_ANCHORED` + passes), and once-per-frame rate limit run BEFORE the OpenCV + warp/encode work. +* **Best-effort kernel** — any exception inside the warp / JPEG + encode path or any non-`FreshnessRejectionError` writer failure is + swallowed with a WARNING log and `None` return; the steady-state + `current_estimate` output is never disturbed. +* **AC-7 first-emission INFO log** — emitted exactly once per + flight, subsequent emissions log at DEBUG. + +## Files added / modified + +### Added (2) + +- `src/gps_denied_onboard/components/c5_state/_orthorectifier.py` — + the `MidFlightTileWriter` Protocol cut, `OrthorectifierThresholds` + dataclass, and `Orthorectifier` class with the homography + construction (`_ground_plane_homography`, + `_compose_tile_to_image_homography`, `_invert_se3`, + `_quat_to_rotation_matrix`). +- `tests/unit/c5_state/test_az389_orthorectifier.py` — 22 tests + covering AC-1..AC-9 plus the inlier-floor gate plus the + composition-root `_C6MidFlightIngestAdapter` translation rules + plus `OrthorectifierConfig` validation. + +### Modified (4) + +- `src/gps_denied_onboard/components/c5_state/config.py` — new + `OrthorectifierConfig` dataclass nested as + `C5StateConfig.orthorectifier`. Disabled by default; tunable + thresholds + tile / zoom / JPEG knobs. +- `src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py` + — orthorectifier state fields, `attach_orthorectifier` + + `set_latest_nav_frame` extension methods, `_maybe_emit_mid_flight_tile` + hook in `current_estimate()`, and `create()` factory now accepts + the optional `mid_flight_tile_writer` / `camera_calibration` / + `flight_id` / `companion_id` params. +- `src/gps_denied_onboard/components/c5_state/eskf_baseline.py` — + same set of changes, plus `_latest_vio` cache (ESKF historically + did not retain the full VIO DTO). +- `src/gps_denied_onboard/runtime_root/state_factory.py` — + `_C6MidFlightIngestAdapter` class + `build_state_estimator` now + accepts optional `tile_store` / `camera_calibration` / + `flight_id` / `companion_id` and forwards them to the strategy + factory when AZ-389 is enabled. + +## Task Results + +| Task | Status | Files Modified | Focused tests | AC Coverage | Issues | +|--------|--------|---------------------------------------------------------------------------------------------------------------------------------|---------------|--------------|--------| +| AZ-389 | Done | 1 added + 4 modified under `src/`; 1 added under `tests/unit/c5_state/`; task spec moved `_docs/02_tasks/todo/` → `done/` | 22/22 pass | 9/9 covered | None | + +## AC Test Coverage: 9/9 covered + +| AC | Test | Status | +|------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| +| AC-1 | `test_ac1_homography_projects_origin_to_principal_point` + `test_ac1_homography_projects_offset_within_one_pixel` + `test_compose_tile_to_image_homography_is_centred_on_camera` | Covered | +| AC-2 | `test_ac2_cov_norm_above_threshold_blocks_emission` + `test_ac2_cov_norm_below_threshold_emits` | Covered | +| AC-3 | `test_ac3_non_satellite_anchored_blocked` (parametrised over `VISUAL_PROPAGATED` / `DEAD_RECKONED`) | Covered | +| AC-4 | `test_ac4_same_frame_id_processed_only_once` + `test_ac4_distinct_frame_ids_each_emit` | Covered | +| AC-5 | `test_ac5_writer_called_with_onboard_ingest_metadata` + `test_adapter_calls_write_tile_with_onboard_ingest_metadata` | Covered | +| AC-6 | `test_ac6_jpeg_bytes_decode_to_expected_shape_and_quality` + adapter-side `content_sha256_hex == hashlib.sha256(jpeg_bytes).hexdigest()` assertion | Covered | +| AC-7 | `test_ac7_first_emission_logs_info_subsequent_logs_debug` | Covered | +| AC-8 | `test_ac8_missing_inputs_silent_return_none` (parametrised over `frame` / `pose_estimate` / `cov_6x6`) | Covered | +| AC-9 | `test_ac9_writer_returning_none_swallowed` + `test_ac9_writer_raising_swallowed_with_warning` + `test_adapter_translates_freshness_rejection_to_none` | Covered | + +## Code Review Verdict: PASS (self-reviewed) +## Auto-Fix Attempts: 0 +## Stuck Agents: None + +## Cross-batch verification + +- `tests/unit/c5_state/` — 216 / 216 pass (all pre-AZ-389 tests still + pass — Protocol surface unchanged, factory signature is + backward-compatible via default `None` params). +- `tests/unit/test_az270_compose_root.py` — 8 / 8 pass; the + cross-component import lint still holds against the new + `_C6MidFlightIngestAdapter`. +- `tests/unit/test_runtime_root_env_gate.py` + + `tests/unit/test_az401_compose_root_replay.py` + + `tests/unit/test_ac3_compose_files.py` — 38 / 38 pass. +- `tests/unit/c6_tile_cache/` — 126 / 126 in-process tests pass + (Postgres-backed tests skipped; require Docker). + +## Notes / leftovers + +- The Jira description for AZ-389 still references the + pre-AZ-559 `tile_store.put_mid_flight_candidate` API surface; the + local task spec was rewritten against `write_tile` per the + History section. Logged as tracker hygiene; not blocking. +- During investigation of the existing C11 download adapter + (`runtime_root/c11_factory.py::_C6DownloadAdapter.write_tile_for_download`) + we noticed it calls both `tile_store.write_tile(blob, metadata)` + and `metadata_store.insert_metadata(metadata)` sequentially — + given that `PostgresFilesystemStore.write_tile` is itself atomic + (file write + metadata insert in a single transaction) the second + call is a probable redundancy. Out of scope for AZ-389; recorded + here for a future hygiene ticket. + +## Next Batch: All product-implementation tasks complete — proceed to Step 15 (Product Implementation Completeness Gate). diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 3f3d12a..3ba2841 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -6,13 +6,13 @@ step: 7 name: Implement status: in_progress sub_step: - phase: 1 - name: parse - detail: "" + phase: 11 + name: commit + detail: "Batch 65 (AZ-389) committed; awaiting tracker transition + completeness gate" retry_count: 0 cycle: 1 tracker: jira -last_completed_batch: 64 +last_completed_batch: 65 last_cumulative_review: batches_61-63 current_batch: 65 -current_batch_tasks: "" +current_batch_tasks: "AZ-389" diff --git a/src/gps_denied_onboard/components/c5_state/_orthorectifier.py b/src/gps_denied_onboard/components/c5_state/_orthorectifier.py new file mode 100644 index 0000000..b3d7fe5 --- /dev/null +++ b/src/gps_denied_onboard/components/c5_state/_orthorectifier.py @@ -0,0 +1,598 @@ +"""C5 internal orthorectifier sub-component (AZ-389). + +Per epic spec § Scope: "orthorectifier (lives within C5 as an internal +subcomponent)". Receives a converged pose + nav-camera frame, gates on +quality thresholds (covariance, inlier count, source label), produces a +JPEG-encoded tile-aligned image patch via OpenCV ``warpPerspective``, +and persists it to C6 as an ``ONBOARD_INGEST`` tile via a +constructor-injected :class:`MidFlightTileWriter`. + +Cross-component decoupling — c5_state cannot import from c6_tile_cache +(AZ-507; enforced by +``test_az270_compose_root.test_ac6_only_compose_root_imports_concrete_strategies``). +The :class:`MidFlightTileWriter` Protocol below is the consumer-side +cut; the runtime root supplies the concrete adapter via +``state_factory``. The writer translates a C6 ``FreshnessRejectionError`` +into a ``None`` return — the orthorectifier swallows that silently +per AC-NEW-3 (opportunistic emission). + +NOT exported from ``c5_state.__init__``; constructed inside the +estimator factory and observed only through the estimator's +``set_latest_nav_frame`` / ``current_estimate`` hook pair. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Final, Protocol, runtime_checkable + +import cv2 +import numpy as np + +from gps_denied_onboard._types.state import PoseSourceLabel + +if TYPE_CHECKING: + from datetime import datetime + + from gps_denied_onboard._types.calibration import CameraCalibration + from gps_denied_onboard._types.nav import NavCameraFrame + +__all__ = [ + "MidFlightTileWriter", + "Orthorectifier", + "OrthorectifierThresholds", +] + + +_DEFAULT_JPEG_QUALITY: Final[int] = 90 +_DEFAULT_TILE_SIZE_PIXELS: Final[int] = 256 +_DEFAULT_TILE_SIZE_METERS: Final[float] = 100.0 +_DEFAULT_ZOOM_LEVEL: Final[int] = 18 + +_LOG = logging.getLogger("gps_denied_onboard.c5_state.orthorectifier") + + +@runtime_checkable +class MidFlightTileWriter(Protocol): + """Consumer-side cut of the C6 mid-flight tile write surface. + + The concrete adapter lives in + ``gps_denied_onboard.runtime_root.state_factory`` and is the only + place that knows about C6's :class:`TileMetadata` / :class:`TileSource` + / :class:`FreshnessLabel` / :class:`VotingStatus` / + :class:`FreshnessRejectionError`. C5's orthorectifier never imports + them (AZ-507 architectural rule). + + Returning ``None`` indicates the C6 freshness gate rejected the + insert. Per AC-NEW-3 the orthorectifier swallows that silently — + mid-flight tile generation is opportunistic. Returning the + ``(zoom_level, lat, lon)`` tuple confirms persistence and is the + tile's spatial identity. + """ + + def write_mid_flight_tile( + self, + *, + jpeg_bytes: bytes, + zoom_level: int, + lat: float, + lon: float, + tile_size_meters: float, + tile_size_pixels: int, + capture_timestamp: "datetime", + flight_id: str, + companion_id: str, + estimator_label: str, + covariance_2x2_horizontal: tuple[ + tuple[float, float], tuple[float, float] + ], + last_anchor_age_ms: int, + mre_px: float, + imu_bias_norm: float, + ) -> tuple[int, float, float] | None: ... + + +@dataclass(frozen=True, slots=True) +class OrthorectifierThresholds: + """Quality + output-grid configuration for :class:`Orthorectifier`. + + ``cov_norm_threshold`` is the Frobenius-norm gate on the C5 6x6 + pose covariance. ``inlier_floor`` is the strict-greater-than + threshold on the C4 reported inlier count. ``tile_size_meters`` + and ``tile_size_pixels`` describe the orthorectified output patch + grid; ``zoom_level`` is the satellite-tile zoom the C6 row will be + indexed at. ``jpeg_quality`` is the OpenCV ``cv2.imencode`` + quality knob (0-100; default 90 to balance fidelity and budget). + """ + + cov_norm_threshold: float + inlier_floor: int + tile_size_meters: float = _DEFAULT_TILE_SIZE_METERS + tile_size_pixels: int = _DEFAULT_TILE_SIZE_PIXELS + zoom_level: int = _DEFAULT_ZOOM_LEVEL + jpeg_quality: int = _DEFAULT_JPEG_QUALITY + + def __post_init__(self) -> None: + if not self.cov_norm_threshold > 0.0: + raise ValueError( + f"OrthorectifierThresholds.cov_norm_threshold must be > 0; " + f"got {self.cov_norm_threshold}" + ) + if self.inlier_floor < 0: + raise ValueError( + f"OrthorectifierThresholds.inlier_floor must be >= 0; " + f"got {self.inlier_floor}" + ) + if not self.tile_size_meters > 0.0: + raise ValueError( + f"OrthorectifierThresholds.tile_size_meters must be > 0; " + f"got {self.tile_size_meters}" + ) + if self.tile_size_pixels <= 0: + raise ValueError( + f"OrthorectifierThresholds.tile_size_pixels must be > 0; " + f"got {self.tile_size_pixels}" + ) + if not 0 <= self.zoom_level <= 21: + raise ValueError( + f"OrthorectifierThresholds.zoom_level must be in [0, 21]; " + f"got {self.zoom_level}" + ) + if not 0 <= self.jpeg_quality <= 100: + raise ValueError( + f"OrthorectifierThresholds.jpeg_quality must be in [0, 100]; " + f"got {self.jpeg_quality}" + ) + + +class Orthorectifier: + """C5-internal sub-component — emits at most one mid-flight tile per frame. + + All logic is single-threaded by construction (state-ingest thread per + Invariant 1). The orthorectifier is opaque to test wiring: tests + construct it with a fake :class:`MidFlightTileWriter` and a synthetic + :class:`CameraCalibration`, then call :meth:`try_emit_candidate` + directly with crafted ``(frame, pose, cov_6x6, ...)`` tuples. + + Quality gates (AC-2 / AC-3 / AC-4 / AC-8) run before any heavy + OpenCV work. The orthorectification kernel (AC-1) computes the + ground→image plane homography from the pose + calibration and + delegates sampling to ``cv2.warpPerspective``. + + Per AC-NEW-3 (replay protocol Invariant 12) the emission is + opportunistic — a freshness rejection from C6 is swallowed (AC-9) + and a DEBUG log emitted. Hard failures inside the OpenCV kernel + are also swallowed and logged at WARNING (the steady-state + estimate is the priority; mid-flight tile gen is best-effort). + """ + + __slots__ = ( + "_tile_writer", + "_thresholds", + "_calibration", + "_flight_id", + "_companion_id", + "_first_emission_done", + "_last_processed_frame_id", + ) + + def __init__( + self, + *, + tile_writer: MidFlightTileWriter, + thresholds: OrthorectifierThresholds, + camera_calibration: "CameraCalibration", + flight_id: str, + companion_id: str, + ) -> None: + self._tile_writer = tile_writer + self._thresholds = thresholds + self._calibration = camera_calibration + self._flight_id = flight_id + self._companion_id = companion_id + self._first_emission_done: bool = False + self._last_processed_frame_id: int | None = None + + def try_emit_candidate( + self, + *, + frame: "NavCameraFrame | None", + pose_estimate: Any, + cov_6x6: np.ndarray | None, + inlier_count: int, + mre_px: float, + source_label: PoseSourceLabel, + last_anchor_age_ms: int, + imu_bias_norm: float, + estimator_label: str, + ) -> tuple[int, float, float] | None: + """Run quality gates → orthorectify → write. Returns persisted ``(zoom, lat, lon)`` or ``None``. + + Returns ``None`` (silently) when ANY of the following hold: + + * AC-8 — ``frame`` or ``pose_estimate`` or ``cov_6x6`` is ``None``; + * AC-4 — ``frame.frame_id`` was already processed in a prior call; + * AC-3 — ``source_label != SATELLITE_ANCHORED``; + * AC-2 — Frobenius norm of ``cov_6x6`` >= ``cov_norm_threshold``; + * inlier-floor — ``inlier_count <= inlier_floor``; + * AC-9 — :class:`MidFlightTileWriter` returned ``None`` + (freshness rejection — DEBUG log only); + * orthorectification kernel raised (WARNING log; not propagated). + """ + if frame is None or pose_estimate is None or cov_6x6 is None: + _LOG.debug( + "c5.state.orthorectifier_skip_missing_input", + extra={ + "kind": "c5.state.orthorectifier_skip_missing_input", + "kv": { + "frame_present": frame is not None, + "pose_present": pose_estimate is not None, + "cov_present": cov_6x6 is not None, + }, + }, + ) + return None + + if ( + self._last_processed_frame_id is not None + and self._last_processed_frame_id == int(frame.frame_id) + ): + return None + self._last_processed_frame_id = int(frame.frame_id) + + if source_label != PoseSourceLabel.SATELLITE_ANCHORED: + _LOG.debug( + "c5.state.orthorectifier_skip_source_label", + extra={ + "kind": "c5.state.orthorectifier_skip_source_label", + "kv": { + "frame_id": int(frame.frame_id), + "source_label": source_label.value, + }, + }, + ) + return None + + cov_norm = float(np.linalg.norm(np.asarray(cov_6x6), ord="fro")) + if cov_norm >= self._thresholds.cov_norm_threshold: + _LOG.debug( + "c5.state.orthorectifier_skip_cov_gate", + extra={ + "kind": "c5.state.orthorectifier_skip_cov_gate", + "kv": { + "frame_id": int(frame.frame_id), + "cov_norm": cov_norm, + "threshold": self._thresholds.cov_norm_threshold, + }, + }, + ) + return None + + if inlier_count <= self._thresholds.inlier_floor: + _LOG.debug( + "c5.state.orthorectifier_skip_inlier_gate", + extra={ + "kind": "c5.state.orthorectifier_skip_inlier_gate", + "kv": { + "frame_id": int(frame.frame_id), + "inlier_count": int(inlier_count), + "floor": self._thresholds.inlier_floor, + }, + }, + ) + return None + + try: + jpeg_bytes = self._orthorectify_to_jpeg(frame, pose_estimate) + except Exception as exc: + # AC-NEW-3 opportunistic emission — log + return; do not + # propagate to the C5 estimator's caller. + _LOG.warning( + "c5.state.orthorectifier_kernel_failed", + extra={ + "kind": "c5.state.orthorectifier_kernel_failed", + "kv": { + "frame_id": int(frame.frame_id), + "error": str(exc), + "error_type": type(exc).__name__, + }, + }, + ) + return None + + if jpeg_bytes is None or len(jpeg_bytes) == 0: + _LOG.warning( + "c5.state.orthorectifier_empty_jpeg", + extra={ + "kind": "c5.state.orthorectifier_empty_jpeg", + "kv": {"frame_id": int(frame.frame_id)}, + }, + ) + return None + + cov_2x2_h = self._extract_horizontal_covariance(np.asarray(cov_6x6)) + + try: + result = self._tile_writer.write_mid_flight_tile( + jpeg_bytes=jpeg_bytes, + zoom_level=int(self._thresholds.zoom_level), + lat=float(pose_estimate.position_wgs84.lat_deg), + lon=float(pose_estimate.position_wgs84.lon_deg), + tile_size_meters=float(self._thresholds.tile_size_meters), + tile_size_pixels=int(self._thresholds.tile_size_pixels), + capture_timestamp=frame.timestamp, + flight_id=self._flight_id, + companion_id=self._companion_id, + estimator_label=estimator_label, + covariance_2x2_horizontal=cov_2x2_h, + last_anchor_age_ms=int(last_anchor_age_ms), + mre_px=float(mre_px), + imu_bias_norm=float(imu_bias_norm), + ) + except Exception as exc: + # The writer is responsible for catching the C6 + # FreshnessRejectionError family and translating it to None. + # Anything else escaping is an infrastructure failure — log + # at WARNING but do not propagate (mid-flight tile gen is + # best-effort and must never break the steady-state + # current_estimate path). + _LOG.warning( + "c5.state.orthorectifier_writer_failed", + extra={ + "kind": "c5.state.orthorectifier_writer_failed", + "kv": { + "frame_id": int(frame.frame_id), + "error": str(exc), + "error_type": type(exc).__name__, + }, + }, + ) + return None + + if result is None: + _LOG.debug( + "c5.state.mid_flight_candidate_freshness_rejected", + extra={ + "kind": "c5.state.mid_flight_candidate_freshness_rejected", + "kv": { + "frame_id": int(frame.frame_id), + "lat": float(pose_estimate.position_wgs84.lat_deg), + "lon": float(pose_estimate.position_wgs84.lon_deg), + }, + }, + ) + return None + + if not self._first_emission_done: + self._first_emission_done = True + _LOG.info( + "c5.state.first_mid_flight_candidate", + extra={ + "kind": "c5.state.first_mid_flight_candidate", + "kv": { + "frame_id": int(frame.frame_id), + "tile_id": list(result), + "cov_norm": cov_norm, + }, + }, + ) + else: + _LOG.debug( + "c5.state.mid_flight_candidate_emitted", + extra={ + "kind": "c5.state.mid_flight_candidate_emitted", + "kv": { + "frame_id": int(frame.frame_id), + "tile_id": list(result), + "cov_norm": cov_norm, + "inlier_count": int(inlier_count), + }, + }, + ) + + return result + + def _orthorectify_to_jpeg( + self, frame: "NavCameraFrame", pose: Any + ) -> bytes: + image = self._coerce_image(frame.image) + K = self._coerce_intrinsics(self._calibration.intrinsics_3x3) + body_T_cam = self._coerce_se3(self._calibration.body_to_camera_se3) + world_T_body = self._world_T_body_from_pose(pose) + world_T_cam = world_T_body @ body_T_cam + cam_T_world = _invert_se3(world_T_cam) + + H_ground_to_image = _ground_plane_homography(K=K, cam_T_world=cam_T_world) + H_tile_to_image = _compose_tile_to_image_homography( + H_ground_to_image=H_ground_to_image, + ground_x_center=float(world_T_cam[0, 3]), + ground_y_center=float(world_T_cam[1, 3]), + tile_size_meters=float(self._thresholds.tile_size_meters), + tile_size_pixels=int(self._thresholds.tile_size_pixels), + ) + + N = int(self._thresholds.tile_size_pixels) + warped = cv2.warpPerspective( + image, + H_tile_to_image, + (N, N), + flags=cv2.INTER_LINEAR | cv2.WARP_INVERSE_MAP, + borderMode=cv2.BORDER_CONSTANT, + borderValue=0, + ) + + encode_params = [ + int(cv2.IMWRITE_JPEG_QUALITY), + int(self._thresholds.jpeg_quality), + ] + success, buffer = cv2.imencode(".jpg", warped, encode_params) + if not success: + raise RuntimeError("cv2.imencode(.jpg) returned failure") + return bytes(buffer.tobytes()) + + @staticmethod + def _extract_horizontal_covariance( + cov_6x6: np.ndarray, + ) -> tuple[tuple[float, float], tuple[float, float]]: + """Pull the horizontal-translation 2x2 sub-block. + + The C5 6x6 covariance is documented as rotation-then-translation + (rxx, ryy, rzz, txx, tyy, tzz) in the GTSAM tangent-space order + (matching ``VioOutput.pose_covariance_6x6``). Indices [3, 4] are + the East / North translation components — the horizontal sub-block + the C6 voting layer consumes (per AZ-389 § Constraints). + """ + if cov_6x6.shape != (6, 6): + raise ValueError( + f"cov_6x6 must be shape (6, 6); got {cov_6x6.shape}" + ) + sub = cov_6x6[3:5, 3:5] + return ( + (float(sub[0, 0]), float(sub[0, 1])), + (float(sub[1, 0]), float(sub[1, 1])), + ) + + @staticmethod + def _coerce_image(image: Any) -> np.ndarray: + arr = np.asarray(image) + if arr.ndim not in (2, 3): + raise ValueError( + f"NavCameraFrame.image must be 2-D (grayscale) or 3-D " + f"(H, W, C); got ndim={arr.ndim}" + ) + return arr + + @staticmethod + def _coerce_intrinsics(k: Any) -> np.ndarray: + arr = np.asarray(k, dtype=np.float64) + if arr.shape != (3, 3): + raise ValueError( + f"intrinsics_3x3 must be shape (3, 3); got {arr.shape}" + ) + return arr + + @staticmethod + def _coerce_se3(se3: Any) -> np.ndarray: + arr = np.asarray(se3, dtype=np.float64) + if arr.shape == (4, 4): + return arr + # Some helper SE3 wrappers expose ``.matrix()``; try that. + if hasattr(se3, "matrix"): + arr = np.asarray(se3.matrix(), dtype=np.float64) + if arr.shape == (4, 4): + return arr + raise ValueError( + f"body_to_camera_se3 must be a 4x4 array or expose .matrix(); " + f"got shape={arr.shape}" + ) + + @staticmethod + def _world_T_body_from_pose(pose: Any) -> np.ndarray: + """Build a 4x4 homogeneous ``world_T_body`` from a :class:`PoseEstimate`. + + The pose's ``position_wgs84`` is collapsed to the local-ENU + plane anchored at the camera's own lat/lon — i.e. the camera + is at world-origin in the orthorectifier's ENU frame. The + ground plane is at ``Z = -altitude`` so the homography sees a + non-degenerate plane separation. This keeps the tile-relative + math agnostic to the global ENU origin and avoids re-injecting + the ``WgsConverter`` at this layer; the persisted tile_id lat + / lon comes directly from the pose's ``position_wgs84``. + """ + R = _quat_to_rotation_matrix(pose.orientation_world_T_body) + T = np.eye(4, dtype=np.float64) + T[:3, :3] = R + # Camera body translation: at world origin in the local-ENU + # working frame (East=0, North=0). Altitude-above-ground is + # the only dimension that matters for the homography. + T[0, 3] = 0.0 + T[1, 3] = 0.0 + T[2, 3] = float(pose.position_wgs84.alt_m) + return T + + +def _quat_to_rotation_matrix(q: Any) -> np.ndarray: + """Convert a scalar-first :class:`Quat` ``(w, x, y, z)`` to a 3x3 rotation.""" + w = float(q.w) + x = float(q.x) + y = float(q.y) + z = float(q.z) + norm = (w * w + x * x + y * y + z * z) ** 0.5 + if norm <= 0.0: + raise ValueError("Quat has zero norm; cannot convert to rotation") + w /= norm + x /= norm + y /= norm + z /= norm + return np.array( + [ + [1.0 - 2.0 * (y * y + z * z), 2.0 * (x * y - z * w), 2.0 * (x * z + y * w)], + [2.0 * (x * y + z * w), 1.0 - 2.0 * (x * x + z * z), 2.0 * (y * z - x * w)], + [2.0 * (x * z - y * w), 2.0 * (y * z + x * w), 1.0 - 2.0 * (x * x + y * y)], + ], + dtype=np.float64, + ) + + +def _invert_se3(T: np.ndarray) -> np.ndarray: + """Invert a 4x4 rigid SE(3) transform analytically.""" + if T.shape != (4, 4): + raise ValueError(f"_invert_se3: expected (4, 4); got {T.shape}") + R = T[:3, :3] + t = T[:3, 3] + inv = np.eye(4, dtype=np.float64) + inv[:3, :3] = R.T + inv[:3, 3] = -R.T @ t + return inv + + +def _ground_plane_homography( + *, K: np.ndarray, cam_T_world: np.ndarray +) -> np.ndarray: + """Return the 3x3 homography mapping ground-plane points ``(X, Y, 1)`` (world ENU, ``Z = 0``) to image pixels ``(u, v, 1)``. + + Construction: + + .. code:: text + + [u] [X] + [v] = K @ [r1 r2 t] @ [Y] + [w] [1] + + where ``[r1 | r2 | r3 | t]`` is ``cam_T_world`` and ``r1``, + ``r2`` are its first two columns. The third column ``r3`` is + dropped because the ground plane fixes ``Z = 0`` (so the + homography is full-rank in 2D). + """ + R = cam_T_world[:3, :3] + t = cam_T_world[:3, 3] + return K @ np.column_stack([R[:, 0], R[:, 1], t]) + + +def _compose_tile_to_image_homography( + *, + H_ground_to_image: np.ndarray, + ground_x_center: float, + ground_y_center: float, + tile_size_meters: float, + tile_size_pixels: int, +) -> np.ndarray: + """Build ``H_tile_to_image = H_ground_to_image @ M_tile_to_ground``. + + ``M_tile_to_ground`` maps tile pixel ``(j, i, 1)`` (image-y-down + convention) to ground ``(X, Y, 1)`` (world Y is North, image i is + rows-down). Centred at ``(ground_x_center, ground_y_center)`` so + the camera's nadir lands at the tile centre. + """ + s = float(tile_size_meters) / float(tile_size_pixels) + n = int(tile_size_pixels) + M = np.array( + [ + [s, 0.0, ground_x_center - s * (n / 2.0)], + [0.0, -s, ground_y_center + s * (n / 2.0)], + [0.0, 0.0, 1.0], + ], + dtype=np.float64, + ) + return H_ground_to_image @ M diff --git a/src/gps_denied_onboard/components/c5_state/config.py b/src/gps_denied_onboard/components/c5_state/config.py index 6b5ae02..b51b5d2 100644 --- a/src/gps_denied_onboard/components/c5_state/config.py +++ b/src/gps_denied_onboard/components/c5_state/config.py @@ -9,12 +9,16 @@ root reads ``config.components["c5_state"]`` and dispatches by from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Final from gps_denied_onboard.config.schema import ConfigError -__all__ = ["KNOWN_STATE_STRATEGIES", "C5StateConfig"] +__all__ = [ + "KNOWN_STATE_STRATEGIES", + "C5StateConfig", + "OrthorectifierConfig", +] KNOWN_STATE_STRATEGIES: Final[frozenset[str]] = frozenset({"gtsam_isam2", "eskf"}) @@ -23,6 +27,60 @@ _KEYFRAME_WINDOW_MIN: Final[int] = 10 _KEYFRAME_WINDOW_MAX: Final[int] = 20 +@dataclass(frozen=True) +class OrthorectifierConfig: + """AZ-389 orthorectifier knobs (sub-block of :class:`C5StateConfig`). + + ``enabled`` is the main feature flag. When ``False`` the runtime + root skips orthorectifier construction entirely and the estimator + behaves exactly as before AZ-389; this preserves the existing + smoke-test wiring that does not provide a TileStore. The four + quality / output knobs match + :class:`gps_denied_onboard.components.c5_state._orthorectifier.OrthorectifierThresholds` + one-for-one. + """ + + enabled: bool = False + cov_norm_threshold: float = 1.0 + inlier_floor: int = 30 + tile_size_meters: float = 100.0 + tile_size_pixels: int = 256 + zoom_level: int = 18 + jpeg_quality: int = 90 + + def __post_init__(self) -> None: + if not self.cov_norm_threshold > 0.0: + raise ConfigError( + "OrthorectifierConfig.cov_norm_threshold must be > 0; " + f"got {self.cov_norm_threshold}" + ) + if self.inlier_floor < 0: + raise ConfigError( + "OrthorectifierConfig.inlier_floor must be >= 0; " + f"got {self.inlier_floor}" + ) + if not self.tile_size_meters > 0.0: + raise ConfigError( + "OrthorectifierConfig.tile_size_meters must be > 0; " + f"got {self.tile_size_meters}" + ) + if self.tile_size_pixels <= 0: + raise ConfigError( + "OrthorectifierConfig.tile_size_pixels must be > 0; " + f"got {self.tile_size_pixels}" + ) + if not 0 <= self.zoom_level <= 21: + raise ConfigError( + "OrthorectifierConfig.zoom_level must be in [0, 21]; " + f"got {self.zoom_level}" + ) + if not 0 <= self.jpeg_quality <= 100: + raise ConfigError( + "OrthorectifierConfig.jpeg_quality must be in [0, 100]; " + f"got {self.jpeg_quality}" + ) + + @dataclass(frozen=True) class C5StateConfig: """C5 state-estimator config block. @@ -58,6 +116,7 @@ class C5StateConfig: default_takeoff_origin_sigma_horiz_m: float = 5.0 default_takeoff_origin_sigma_vert_m: float = 10.0 no_estimate_fallback_s: float = 3.0 + orthorectifier: OrthorectifierConfig = field(default_factory=OrthorectifierConfig) def __post_init__(self) -> None: if self.strategy not in KNOWN_STATE_STRATEGIES: diff --git a/src/gps_denied_onboard/components/c5_state/eskf_baseline.py b/src/gps_denied_onboard/components/c5_state/eskf_baseline.py index d6b51cd..d353bcc 100644 --- a/src/gps_denied_onboard/components/c5_state/eskf_baseline.py +++ b/src/gps_denied_onboard/components/c5_state/eskf_baseline.py @@ -75,6 +75,11 @@ from gps_denied_onboard.components.c5_state._source_label_sm import ( RejectionSubscription, SourceLabelStateMachine, ) +from gps_denied_onboard.components.c5_state._orthorectifier import ( + MidFlightTileWriter, + Orthorectifier, + OrthorectifierThresholds, +) from gps_denied_onboard.components.c5_state.config import C5StateConfig from gps_denied_onboard.components.c5_state.errors import ( EstimatorAlreadyStartedError, @@ -88,8 +93,9 @@ from gps_denied_onboard.helpers.wgs_converter import WgsConverter from gps_denied_onboard.logging import get_logger if TYPE_CHECKING: + from gps_denied_onboard._types.calibration import CameraCalibration from gps_denied_onboard._types.fc import GpsHealth, GpsSample - from gps_denied_onboard._types.nav import ImuWindow, VioOutput + from gps_denied_onboard._types.nav import ImuWindow, NavCameraFrame, VioOutput from gps_denied_onboard._types.pose import PoseEstimate from gps_denied_onboard.clock import Clock from gps_denied_onboard.config import Config @@ -229,6 +235,17 @@ class EskfStateEstimator(StateEstimator): producer_id="c5_state", ) + # AZ-389: orthorectifier hook. Same semantics as the iSAM2 + # estimator — wired by the composition root only when AZ-389 + # is enabled in config + the runtime root has a TileStore. + # ``_latest_vio`` is the source of inlier_count / mre_px / + # imu_bias_norm metadata for ``try_emit_candidate``; ESKF + # historically did NOT cache the full DTO so we keep a + # dedicated field rather than abuse ``_prev_vio_*``. + self._orthorectifier: Orthorectifier | None = None + self._latest_nav_frame: "NavCameraFrame | None" = None + self._latest_vio: "VioOutput | None" = None + self._log.debug( "c5.state.eskf_initialised", extra={ @@ -269,6 +286,25 @@ class EskfStateEstimator(StateEstimator): def attach_source_label_state_machine(self, machine: Any) -> None: self._source_label_machine = machine + def attach_orthorectifier(self, orthorectifier: Orthorectifier) -> None: + """AZ-389 — wire the mid-flight tile orthorectifier. + + Optional; called by the composition root only when AZ-389 is + enabled in config. See + :meth:`GtsamIsam2StateEstimator.attach_orthorectifier` for the + cross-estimator contract. + """ + self._orthorectifier = orthorectifier + + def set_latest_nav_frame(self, frame: "NavCameraFrame") -> None: + """AZ-389 — buffer the most recent nav-camera frame. + + Quiet no-op when no orthorectifier is attached. See + :meth:`GtsamIsam2StateEstimator.set_latest_nav_frame` for the + cross-estimator contract. + """ + self._latest_nav_frame = frame + def notify_gps_health(self, gps_health: GpsHealth, now_ns: int | None = None) -> None: machine = self._source_label_machine if isinstance(machine, SourceLabelStateMachine): @@ -455,6 +491,8 @@ class EskfStateEstimator(StateEstimator): self._close_cold_start_window() ts_ns = vio.emitted_at_ns self._guard_timestamp(ts_ns, source="vio") + # AZ-389: cache full DTO for the orthorectifier metadata path. + self._latest_vio = vio curr_pose = vio.relative_pose_T.matrix() if self._prev_vio_pose is None: @@ -656,8 +694,50 @@ class EskfStateEstimator(StateEstimator): ) self._history.append(out) self._fallback.mark_successful_estimate(emitted_at) + + # AZ-389: opportunistic mid-flight tile emission. + self._maybe_emit_mid_flight_tile( + output=out, + covariance=cov6, + last_anchor_age_ms=last_anchor_age_ms, + ) + return out + def _maybe_emit_mid_flight_tile( + self, + *, + output: EstimatorOutput, + covariance: np.ndarray, + last_anchor_age_ms: int, + ) -> None: + """AZ-389 hook — invoke the orthorectifier when wired.""" + orthorectifier = self._orthorectifier + frame = self._latest_nav_frame + if orthorectifier is None or frame is None: + return + latest_vio = self._latest_vio + if latest_vio is None: + inlier_count = 0 + mre_px = float("inf") + imu_bias_norm = 0.0 + else: + quality = latest_vio.feature_quality + inlier_count = int(quality.tracked) + mre_px = float(quality.mre_px) + imu_bias_norm = _imu_bias_norm(latest_vio.imu_bias) + orthorectifier.try_emit_candidate( + frame=frame, + pose_estimate=output, + cov_6x6=covariance, + inlier_count=inlier_count, + mre_px=mre_px, + source_label=output.source_label, + last_anchor_age_ms=int(last_anchor_age_ms), + imu_bias_norm=imu_bias_norm, + estimator_label=_STRATEGY, + ) + def smoothed_history(self, n_keyframes: int) -> list[EstimatorOutput]: """Return up to ``n_keyframes`` recent forward-time entries. @@ -910,11 +990,22 @@ def create( se3_utils: Any, wgs_converter: Any, fdr_client: Any, + mid_flight_tile_writer: MidFlightTileWriter | None = None, + camera_calibration: "CameraCalibration | None" = None, + flight_id: str | None = None, + companion_id: str | None = None, ) -> tuple[StateEstimator, Any]: """Composition-root factory for the ESKF estimator. Returns ``(estimator, handle)`` to match the iSAM2 factory shape; the ESKF returns ``handle=None`` because it has no GTSAM graph. + + AZ-389 wiring (optional): same contract as the iSAM2 factory — + when ``mid_flight_tile_writer`` is supplied AND + ``config.components['c5_state'].orthorectifier.enabled`` is true, + the orthorectifier is constructed and attached. Missing + ``camera_calibration`` / ``flight_id`` / ``companion_id`` while + enabled raises :class:`StateEstimatorConfigError`. """ estimator = EskfStateEstimator( config, @@ -923,6 +1014,31 @@ def create( wgs_converter=wgs_converter, fdr_client=fdr_client, ) + + block: C5StateConfig = estimator._block + if mid_flight_tile_writer is not None and block.orthorectifier.enabled: + if camera_calibration is None or flight_id is None or companion_id is None: + raise StateEstimatorConfigError( + "AZ-389 orthorectifier requires camera_calibration + " + "flight_id + companion_id alongside mid_flight_tile_writer" + ) + thresholds = OrthorectifierThresholds( + cov_norm_threshold=float(block.orthorectifier.cov_norm_threshold), + inlier_floor=int(block.orthorectifier.inlier_floor), + tile_size_meters=float(block.orthorectifier.tile_size_meters), + tile_size_pixels=int(block.orthorectifier.tile_size_pixels), + zoom_level=int(block.orthorectifier.zoom_level), + jpeg_quality=int(block.orthorectifier.jpeg_quality), + ) + orthorectifier = Orthorectifier( + tile_writer=mid_flight_tile_writer, + thresholds=thresholds, + camera_calibration=camera_calibration, + flight_id=flight_id, + companion_id=companion_id, + ) + estimator.attach_orthorectifier(orthorectifier) + return estimator, None @@ -937,6 +1053,19 @@ def register() -> None: # Module-level math + DTO helpers (kept local; small and self-contained). +def _imu_bias_norm(bias: Any) -> float: + """Return ``sqrt(||accel_bias||^2 + ||gyro_bias||^2)`` for a :class:`ImuBias`. + + Mirrors the iSAM2 helper; AZ-389 quality-metadata path. Robust to + absent / ``None`` components (legacy fixtures) by treating them + as zero. + """ + accel = getattr(bias, "accel_bias", (0.0, 0.0, 0.0)) or (0.0, 0.0, 0.0) + gyro = getattr(bias, "gyro_bias", (0.0, 0.0, 0.0)) or (0.0, 0.0, 0.0) + sq = sum(float(c) ** 2 for c in accel) + sum(float(c) ** 2 for c in gyro) + return sq**0.5 + + def _datetime_to_ns(dt: Any) -> int: return int(dt.timestamp() * 1_000_000_000) diff --git a/src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py b/src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py index 6458514..4d84fb5 100644 --- a/src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py +++ b/src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py @@ -65,6 +65,11 @@ from gps_denied_onboard.components.c5_state._source_label_sm import ( RejectionSubscription, SourceLabelStateMachine, ) +from gps_denied_onboard.components.c5_state._orthorectifier import ( + MidFlightTileWriter, + Orthorectifier, + OrthorectifierThresholds, +) from gps_denied_onboard.components.c5_state.config import C5StateConfig from gps_denied_onboard.components.c5_state.errors import ( EstimatorAlreadyStartedError, @@ -78,8 +83,9 @@ from gps_denied_onboard.helpers.wgs_converter import WgsConverter from gps_denied_onboard.logging import get_logger if TYPE_CHECKING: + from gps_denied_onboard._types.calibration import CameraCalibration from gps_denied_onboard._types.fc import GpsHealth, GpsSample - from gps_denied_onboard._types.nav import ImuWindow, VioOutput + from gps_denied_onboard._types.nav import ImuWindow, NavCameraFrame, VioOutput from gps_denied_onboard._types.pose import PoseEstimate from gps_denied_onboard.clock import Clock from gps_denied_onboard.config import Config @@ -281,6 +287,17 @@ class GtsamIsam2StateEstimator(StateEstimator): # ``EstimatorAlreadyStartedError`` (AC-6). self._cold_start_window_closed: bool = False + # AZ-389 state ---------------------------------------------------- + # The orthorectifier is wired in the composition root via + # :meth:`attach_orthorectifier` whenever AZ-389 is enabled in + # config. ``None`` keeps the legacy steady-state path untouched + # (no mid-flight tile gen). ``_latest_nav_frame`` is the + # per-frame buffer the orthorectifier reads on a successful + # ``current_estimate`` (Invariant 1 — single ingest thread, so + # no locking needed). + self._orthorectifier: Orthorectifier | None = None + self._latest_nav_frame: "NavCameraFrame | None" = None + self._log.debug( "c5.state.isam2_initialised", extra={ @@ -315,6 +332,30 @@ class GtsamIsam2StateEstimator(StateEstimator): """ self._isam2_handle = handle + def attach_orthorectifier(self, orthorectifier: Orthorectifier) -> None: + """Wire the AZ-389 mid-flight tile orthorectifier. + + Optional; called by the composition root only when + ``config.components['c5_state'].orthorectifier.enabled`` is + true. Once attached, every successful :meth:`current_estimate` + will invoke + :meth:`Orthorectifier.try_emit_candidate` with the most recent + nav-camera frame buffered via :meth:`set_latest_nav_frame`. + """ + self._orthorectifier = orthorectifier + + def set_latest_nav_frame(self, frame: "NavCameraFrame") -> None: + """Buffer the most recent nav-camera frame for AZ-389. + + The composition root's per-frame loop calls this BEFORE + :meth:`current_estimate` so the orthorectifier sees a + consistent ``(frame, pose)`` pair. Single-threaded contract + (Invariant 1) — no locking. Quiet no-op when no orthorectifier + is attached so callers don't need to inspect the estimator + state before pushing frames. + """ + self._latest_nav_frame = frame + def set_enu_origin(self, origin: LatLonAlt) -> None: """Set the WGS84 origin used by :meth:`current_estimate` for ENU→WGS84. @@ -1004,7 +1045,7 @@ class GtsamIsam2StateEstimator(StateEstimator): # already been computed). self._emit_smoothed_to_fdr_if_any(emitted_at, source_label, last_anchor_age_ms) - return EstimatorOutput( + output = EstimatorOutput( frame_id=uuid4(), position_wgs84=position_wgs84, orientation_world_T_body=orientation, @@ -1016,6 +1057,54 @@ class GtsamIsam2StateEstimator(StateEstimator): emitted_at=emitted_at, ) + # AZ-389: opportunistic mid-flight tile emission. Best-effort — + # the orthorectifier internally swallows any kernel / writer + # failure; only the steady-state ``output`` is on the critical + # path. The hook fires on every successful ``current_estimate`` + # call but the orthorectifier rate-limits itself to one tile per + # ``frame.frame_id`` (AC-4). + self._maybe_emit_mid_flight_tile( + output=output, + covariance=covariance, + last_anchor_age_ms=last_anchor_age_ms, + ) + + return output + + def _maybe_emit_mid_flight_tile( + self, + *, + output: EstimatorOutput, + covariance: np.ndarray, + last_anchor_age_ms: int, + ) -> None: + """AZ-389 hook — invoke the orthorectifier when wired.""" + orthorectifier = self._orthorectifier + frame = self._latest_nav_frame + if orthorectifier is None or frame is None: + return + prev_vio = self._prev_vio + if prev_vio is None: + inlier_count = 0 + mre_px = float("inf") + imu_bias_norm = 0.0 + else: + quality = prev_vio.feature_quality + inlier_count = int(quality.tracked) + mre_px = float(quality.mre_px) + imu_bias_norm = _imu_bias_norm(prev_vio.imu_bias) + orthorectifier.try_emit_candidate( + frame=frame, + pose_estimate=output, + cov_6x6=covariance, + inlier_count=inlier_count, + mre_px=mre_px, + source_label=output.source_label, + last_anchor_age_ms=int(last_anchor_age_ms), + imu_bias_norm=imu_bias_norm, + estimator_label=_STRATEGY, + ) + def smoothed_history(self, n_keyframes: int) -> list[EstimatorOutput]: """Return up to ``min(n_keyframes, K)`` past smoothed estimates. @@ -1475,6 +1564,10 @@ def create( se3_utils: Any, wgs_converter: Any, fdr_client: Any, + mid_flight_tile_writer: MidFlightTileWriter | None = None, + camera_calibration: "CameraCalibration | None" = None, + flight_id: str | None = None, + companion_id: str | None = None, ) -> tuple[StateEstimator, ISam2GraphHandle]: """Composition-root factory — returns ``(estimator, handle)`` tuple. @@ -1482,6 +1575,14 @@ def create( handle is wired against it (the handle holds the estimator reference for graph access), and the estimator is back-wired with the handle so its ``add_*`` methods can call into it. + + AZ-389 wiring (optional): if ``mid_flight_tile_writer`` is + provided AND ``config.components['c5_state'].orthorectifier.enabled`` + is true, an :class:`Orthorectifier` is constructed and attached to + the estimator. ``camera_calibration``, ``flight_id``, and + ``companion_id`` MUST be supplied alongside the writer; missing + any of them while orthorectifier is enabled raises + :class:`StateEstimatorConfigError`. """ estimator = GtsamIsam2StateEstimator( config, @@ -1492,6 +1593,31 @@ def create( ) handle = ISam2GraphHandleImpl(estimator) estimator.attach_handle(handle) + + block: C5StateConfig = estimator._block + if mid_flight_tile_writer is not None and block.orthorectifier.enabled: + if camera_calibration is None or flight_id is None or companion_id is None: + raise StateEstimatorConfigError( + "AZ-389 orthorectifier requires camera_calibration + " + "flight_id + companion_id alongside mid_flight_tile_writer" + ) + thresholds = OrthorectifierThresholds( + cov_norm_threshold=float(block.orthorectifier.cov_norm_threshold), + inlier_floor=int(block.orthorectifier.inlier_floor), + tile_size_meters=float(block.orthorectifier.tile_size_meters), + tile_size_pixels=int(block.orthorectifier.tile_size_pixels), + zoom_level=int(block.orthorectifier.zoom_level), + jpeg_quality=int(block.orthorectifier.jpeg_quality), + ) + orthorectifier = Orthorectifier( + tile_writer=mid_flight_tile_writer, + thresholds=thresholds, + camera_calibration=camera_calibration, + flight_id=flight_id, + companion_id=companion_id, + ) + estimator.attach_orthorectifier(orthorectifier) + return estimator, handle @@ -1581,6 +1707,19 @@ def _make_timestamp_map( # Module-level pure helpers (AZ-384). +def _imu_bias_norm(bias: Any) -> float: + """Return ``sqrt(||accel_bias||^2 + ||gyro_bias||^2)`` for a :class:`ImuBias`. + + Used by :meth:`GtsamIsam2StateEstimator._maybe_emit_mid_flight_tile` + to populate the AZ-389 quality metadata. Robust to ``None`` + components (legacy fixtures) by treating them as zero. + """ + accel = getattr(bias, "accel_bias", (0.0, 0.0, 0.0)) or (0.0, 0.0, 0.0) + gyro = getattr(bias, "gyro_bias", (0.0, 0.0, 0.0)) or (0.0, 0.0, 0.0) + sq = sum(float(c) ** 2 for c in accel) + sum(float(c) ** 2 for c in gyro) + return sq**0.5 + + def _quat_from_pose3(pose: gtsam.Pose3) -> Quat: """Build a scalar-first :class:`Quat` from a ``gtsam.Pose3``. diff --git a/src/gps_denied_onboard/runtime_root/state_factory.py b/src/gps_denied_onboard/runtime_root/state_factory.py index 5fae233..42a6f66 100644 --- a/src/gps_denied_onboard/runtime_root/state_factory.py +++ b/src/gps_denied_onboard/runtime_root/state_factory.py @@ -21,6 +21,7 @@ binary may set ``OFF`` and only link the ESKF baseline. from __future__ import annotations +import hashlib import os import threading from collections.abc import Callable @@ -37,7 +38,9 @@ from gps_denied_onboard.config import Config from gps_denied_onboard.logging import get_logger if TYPE_CHECKING: - pass + from datetime import datetime + + from gps_denied_onboard._types.calibration import CameraCalibration __all__ = [ "StateEstimatorFactory", @@ -136,6 +139,10 @@ def build_state_estimator( se3_utils: Any, wgs_converter: Any, fdr_client: Any, + tile_store: Any | None = None, + camera_calibration: "CameraCalibration | None" = None, + flight_id: str | None = None, + companion_id: str | None = None, ) -> tuple[StateEstimator, ISam2GraphHandle]: """Resolve + build the configured state estimator. @@ -147,6 +154,16 @@ def build_state_estimator( lookup. The first failure surfaces a :class:`StateEstimatorConfigError` with the offending strategy + flag name so the operator gets a clear next step. + + AZ-389 wiring (optional): when ``tile_store`` is provided AND + ``config.components['c5_state'].orthorectifier.enabled`` is true, + a :class:`_C6MidFlightIngestAdapter` wraps the C6 surface + (hiding ``TileMetadata`` / ``TileSource`` / + ``FreshnessRejectionError`` so c5_state stays free of + cross-component imports per AZ-507) and is forwarded to the + estimator factory together with ``camera_calibration``, + ``flight_id``, and ``companion_id``. The factory then constructs + the orthorectifier and attaches it to the estimator. """ block = _read_state_block(config) strategy = block.strategy @@ -169,16 +186,37 @@ def build_state_estimator( f"state strategy {strategy!r} selected by config but not registered; " f"registered strategies: {list_registered_state_strategies()}" ) - estimator, handle = factory( - config=config, - imu_preintegrator=imu_preintegrator, - se3_utils=se3_utils, - wgs_converter=wgs_converter, - fdr_client=fdr_client, - ) + + factory_kwargs: dict[str, Any] = { + "config": config, + "imu_preintegrator": imu_preintegrator, + "se3_utils": se3_utils, + "wgs_converter": wgs_converter, + "fdr_client": fdr_client, + } + if tile_store is not None and block.orthorectifier.enabled: + if camera_calibration is None or flight_id is None or companion_id is None: + raise StateEstimatorConfigError( + "AZ-389 orthorectifier enabled — tile_store must be paired with " + "camera_calibration + flight_id + companion_id " + "(missing: " + f"camera_calibration={camera_calibration is None}, " + f"flight_id={flight_id is None}, " + f"companion_id={companion_id is None})" + ) + factory_kwargs["mid_flight_tile_writer"] = _C6MidFlightIngestAdapter( + tile_store=tile_store + ) + factory_kwargs["camera_calibration"] = camera_calibration + factory_kwargs["flight_id"] = flight_id + factory_kwargs["companion_id"] = companion_id + + estimator, handle = factory(**factory_kwargs) _log_strategy_loaded( strategy=strategy, keyframe_window_size=block.keyframe_window_size, + orthorectifier_enabled=block.orthorectifier.enabled + and tile_store is not None, ) return estimator, handle @@ -199,16 +237,124 @@ def _read_state_block(config: Config) -> C5StateConfig: ) -def _log_strategy_loaded(*, strategy: str, keyframe_window_size: int) -> None: +def _log_strategy_loaded( + *, + strategy: str, + keyframe_window_size: int, + orthorectifier_enabled: bool, +) -> None: log = get_logger("runtime_root.state_factory") log.info( f"c5.state.strategy_loaded: strategy={strategy} " - f"keyframe_window_size={keyframe_window_size}", + f"keyframe_window_size={keyframe_window_size} " + f"orthorectifier_enabled={orthorectifier_enabled}", extra={ "kind": "c5.state.strategy_loaded", "kv": { "strategy": strategy, "keyframe_window_size": keyframe_window_size, + "orthorectifier_enabled": orthorectifier_enabled, }, }, ) + + +# ---------------------------------------------------------------------- +# AZ-389: composition-root bridge between c5_state and c6_tile_cache. + + +class _C6MidFlightIngestAdapter: + """C5 ↔ C6 adapter for AZ-389 mid-flight tile candidates. + + The c5_state component cannot import from c6_tile_cache (AZ-507; + ``test_az270_compose_root.test_ac6_only_compose_root_imports_concrete_strategies``). + This adapter is the single allowed bridge: it lives in the + composition root, accepts the c6 :class:`TileStore` reference, + and exposes the + :class:`gps_denied_onboard.components.c5_state._orthorectifier.MidFlightTileWriter` + Protocol surface that c5 depends on. + + Translation rules: + + * ``write_mid_flight_tile`` builds the canonical + :class:`TileMetadata` (``TileSource.ONBOARD_INGEST`` + + ``FreshnessLabel.FRESH`` + ``VotingStatus.PENDING``) and a + :class:`TileQualityMetadata` from the C5 inputs, hashes the + JPEG bytes, and calls :meth:`TileStore.write_tile` once + (atomic file + metadata insert). + * A :class:`FreshnessRejectionError` from C6 is translated to a + ``None`` return — the orthorectifier swallows that silently + per AC-NEW-3. + * Any other C6 error propagates; the orthorectifier itself + wraps that in a WARNING log and returns ``None`` so the C5 + ``current_estimate`` path is never broken by tile-cache + infrastructure failures. + """ + + __slots__ = ("_tile_store",) + + def __init__(self, *, tile_store: Any) -> None: + self._tile_store = tile_store + + def write_mid_flight_tile( + self, + *, + jpeg_bytes: bytes, + zoom_level: int, + lat: float, + lon: float, + tile_size_meters: float, + tile_size_pixels: int, + capture_timestamp: "datetime", + flight_id: str, + companion_id: str, + estimator_label: str, + covariance_2x2_horizontal: tuple[ + tuple[float, float], tuple[float, float] + ], + last_anchor_age_ms: int, + mre_px: float, + imu_bias_norm: float, + ) -> tuple[int, float, float] | None: + from gps_denied_onboard.components.c6_tile_cache._types import ( + FreshnessLabel, + TileId, + TileMetadata, + TileQualityMetadata, + TileSource, + VotingStatus, + ) + from gps_denied_onboard.components.c6_tile_cache.errors import ( + FreshnessRejectionError, + ) + + tile_id = TileId( + zoom_level=int(zoom_level), + lat=float(lat), + lon=float(lon), + ) + quality = TileQualityMetadata( + estimator_label=str(estimator_label), + covariance_2x2=covariance_2x2_horizontal, + last_anchor_age_ms=int(last_anchor_age_ms), + mre_px=float(mre_px), + imu_bias_norm=float(imu_bias_norm), + ) + metadata = TileMetadata( + tile_id=tile_id, + tile_size_meters=float(tile_size_meters), + tile_size_pixels=int(tile_size_pixels), + capture_timestamp=capture_timestamp, + source=TileSource.ONBOARD_INGEST, + content_sha256_hex=hashlib.sha256(jpeg_bytes).hexdigest(), + freshness_label=FreshnessLabel.FRESH, + flight_id=str(flight_id), + companion_id=str(companion_id), + quality_metadata=quality, + voting_status=VotingStatus.PENDING, + ) + try: + self._tile_store.write_tile(jpeg_bytes, metadata) + except FreshnessRejectionError: + return None + return (tile_id.zoom_level, tile_id.lat, tile_id.lon) diff --git a/tests/unit/c5_state/test_az389_orthorectifier.py b/tests/unit/c5_state/test_az389_orthorectifier.py new file mode 100644 index 0000000..6b1f820 --- /dev/null +++ b/tests/unit/c5_state/test_az389_orthorectifier.py @@ -0,0 +1,715 @@ +"""AZ-389 — C5 Orthorectifier → C6 mid-flight tile candidate emission. + +Covers AC-1..AC-9 from +``_docs/02_tasks/todo/AZ-389_c5_orthorectifier_c6.md``. + +Tests instantiate :class:`Orthorectifier` directly with a fake +:class:`MidFlightTileWriter`, then drive synthetic +``(NavCameraFrame, EstimatorOutput, cov_6x6, ...)`` tuples through +:meth:`Orthorectifier.try_emit_candidate`. The composition-root +adapter (``_C6MidFlightIngestAdapter``) is exercised separately +against fake C6 protocols. + +Quality-gate / silent-return / freshness-rejection branches are +asserted via call-count checks against the fake writer; the +homography-geometry branch (AC-1) verifies that the ground-plane +homography projects a synthetic ground point to the expected image +pixel within 1 px (the "1-pixel tolerance" gate). +""" + +from __future__ import annotations + +import hashlib +import logging +from datetime import datetime, timezone +from typing import Any +from unittest import mock +from uuid import uuid4 + +import cv2 +import numpy as np +import pytest + +from gps_denied_onboard._types.calibration import CameraCalibration +from gps_denied_onboard._types.geo import LatLonAlt +from gps_denied_onboard._types.nav import NavCameraFrame +from gps_denied_onboard._types.state import ( + EstimatorOutput, + PoseSourceLabel, + Quat, +) +from gps_denied_onboard.components.c5_state._orthorectifier import ( + MidFlightTileWriter, + Orthorectifier, + OrthorectifierThresholds, + _compose_tile_to_image_homography, + _ground_plane_homography, + _invert_se3, + _quat_to_rotation_matrix, +) + + +# ---------------------------------------------------------------------- +# Fixtures + helpers. + + +_FLIGHT_ID = "flt-test-001" +_COMPANION_ID = "comp-test-001" +_ESTIMATOR_LABEL = "gtsam_isam2" + + +def _make_thresholds( + *, + cov_norm_threshold: float = 1.0, + inlier_floor: int = 30, + tile_size_meters: float = 100.0, + tile_size_pixels: int = 64, + zoom_level: int = 18, + jpeg_quality: int = 90, +) -> OrthorectifierThresholds: + return OrthorectifierThresholds( + cov_norm_threshold=cov_norm_threshold, + inlier_floor=inlier_floor, + tile_size_meters=tile_size_meters, + tile_size_pixels=tile_size_pixels, + zoom_level=zoom_level, + jpeg_quality=jpeg_quality, + ) + + +def _nadir_calibration() -> CameraCalibration: + """Nadir-looking camera, 500-pixel focal length, centred principal point.""" + K = np.array( + [ + [500.0, 0.0, 200.0], + [0.0, 500.0, 200.0], + [0.0, 0.0, 1.0], + ], + dtype=np.float64, + ) + # body_T_cam — camera +Z aligned with body +Z (i.e. body coordinate + # frame is camera coordinate frame). The orthorectifier will then + # use the pose's world_T_body directly as world_T_cam, which is the + # simplest geometric setup for a unit test. + body_T_cam = np.eye(4, dtype=np.float64) + return CameraCalibration( + camera_id="test_cam", + intrinsics_3x3=K, + distortion=np.zeros(5, dtype=np.float64), + body_to_camera_se3=body_T_cam, + acquisition_method="synthetic", + ) + + +def _make_image(size: int = 400, dtype: Any = np.uint8) -> np.ndarray: + """Synthetic 400x400 grayscale image — a single bright square at the centre.""" + img = np.zeros((size, size), dtype=dtype) + cs = size // 2 + img[cs - 20 : cs + 20, cs - 20 : cs + 20] = 255 + return img + + +def _frame(frame_id: int = 1) -> NavCameraFrame: + return NavCameraFrame( + frame_id=frame_id, + timestamp=datetime(2026, 5, 16, 12, 0, 0, tzinfo=timezone.utc), + image=_make_image(), + camera_calibration_id="test_cam", + ) + + +def _camera_looking_down_quat() -> Quat: + """Quaternion for a body-frame camera whose +Z axis points along world -Z (nadir). + + Built as a 180° rotation about world +X. This flips body +Z to + world -Z (so a point on the ground at world +Z = -altitude appears + in front of the camera) and body +Y to world -Y; the X axis is + preserved. The standard pinhole projection then maps ground (X, Y) + to pixel (cx + fx*X/h, cy - fy*Y/h) where ``h`` is altitude — the + expected nadir homography for unit tests. + """ + angle = np.pi + axis = np.array([1.0, 0.0, 0.0]) + # q = (cos(theta/2), axis * sin(theta/2)) + half = angle / 2.0 + return Quat( + w=float(np.cos(half)), + x=float(axis[0] * np.sin(half)), + y=float(axis[1] * np.sin(half)), + z=float(axis[2] * np.sin(half)), + ) + + +def _output( + *, + lat: float = 50.45, + lon: float = 30.52, + alt_m: float = 100.0, + cov: np.ndarray | None = None, + source_label: PoseSourceLabel = PoseSourceLabel.SATELLITE_ANCHORED, + quat: Quat | None = None, +) -> EstimatorOutput: + if cov is None: + cov = np.eye(6, dtype=np.float64) * 0.01 + if quat is None: + quat = _camera_looking_down_quat() + return EstimatorOutput( + frame_id=uuid4(), + position_wgs84=LatLonAlt(lat_deg=lat, lon_deg=lon, alt_m=alt_m), + orientation_world_T_body=quat, + velocity_world_mps=(0.0, 0.0, 0.0), + covariance_6x6=cov, + source_label=source_label, + last_satellite_anchor_age_ms=50, + smoothed=False, + emitted_at=1_000_000_000, + ) + + +class _RecordingWriter: + """Fake :class:`MidFlightTileWriter` — records every call. + + By default returns ``(zoom, lat, lon)`` from the supplied input + (i.e. simulates a successful insert). Configure ``return_value`` to + a literal value (or ``None`` to simulate the C6 freshness gate + rejecting the insert) to override per-test. + """ + + def __init__(self, *, return_override: Any = ..., side_effect: Any = None) -> None: + self.calls: list[dict[str, Any]] = [] + self._return_override = return_override + self._side_effect = side_effect + + def write_mid_flight_tile( + self, + *, + jpeg_bytes: bytes, + zoom_level: int, + lat: float, + lon: float, + tile_size_meters: float, + tile_size_pixels: int, + capture_timestamp: datetime, + flight_id: str, + companion_id: str, + estimator_label: str, + covariance_2x2_horizontal: tuple[ + tuple[float, float], tuple[float, float] + ], + last_anchor_age_ms: int, + mre_px: float, + imu_bias_norm: float, + ) -> tuple[int, float, float] | None: + kv = { + "jpeg_bytes": jpeg_bytes, + "zoom_level": zoom_level, + "lat": lat, + "lon": lon, + "tile_size_meters": tile_size_meters, + "tile_size_pixels": tile_size_pixels, + "capture_timestamp": capture_timestamp, + "flight_id": flight_id, + "companion_id": companion_id, + "estimator_label": estimator_label, + "covariance_2x2_horizontal": covariance_2x2_horizontal, + "last_anchor_age_ms": last_anchor_age_ms, + "mre_px": mre_px, + "imu_bias_norm": imu_bias_norm, + } + self.calls.append(kv) + if self._side_effect is not None: + raise self._side_effect + if self._return_override is ...: + return (zoom_level, lat, lon) + return self._return_override + + +def _make_orthorectifier( + *, + writer: MidFlightTileWriter | None = None, + thresholds: OrthorectifierThresholds | None = None, + calibration: CameraCalibration | None = None, +) -> tuple[Orthorectifier, _RecordingWriter]: + if writer is None: + writer = _RecordingWriter() + return ( + Orthorectifier( + tile_writer=writer, + thresholds=thresholds or _make_thresholds(), + camera_calibration=calibration or _nadir_calibration(), + flight_id=_FLIGHT_ID, + companion_id=_COMPANION_ID, + ), + writer, # type: ignore[return-value] + ) + + +def _emit_kwargs( + *, + frame: NavCameraFrame | None = None, + output: EstimatorOutput | None = None, + cov_6x6: np.ndarray | None = None, + inlier_count: int = 60, + mre_px: float = 1.5, + source_label: PoseSourceLabel | None = None, + last_anchor_age_ms: int = 50, + imu_bias_norm: float = 0.01, + estimator_label: str = _ESTIMATOR_LABEL, +) -> dict[str, Any]: + out = output if output is not None else _output() + return { + "frame": frame if frame is not None else _frame(), + "pose_estimate": out, + "cov_6x6": cov_6x6 if cov_6x6 is not None else out.covariance_6x6, + "inlier_count": inlier_count, + "mre_px": mre_px, + "source_label": source_label or out.source_label, + "last_anchor_age_ms": last_anchor_age_ms, + "imu_bias_norm": imu_bias_norm, + "estimator_label": estimator_label, + } + + +# ---------------------------------------------------------------------- +# AC-2 — covariance-norm gate. + + +def test_ac2_cov_norm_above_threshold_blocks_emission() -> None: + # Arrange + ortho, writer = _make_orthorectifier( + thresholds=_make_thresholds(cov_norm_threshold=1.0) + ) + big_cov = np.eye(6, dtype=np.float64) * 10.0 + out = _output(cov=big_cov) + + # Act + result = ortho.try_emit_candidate(**_emit_kwargs(output=out, cov_6x6=big_cov)) + + # Assert + assert result is None + assert writer.calls == [] + + +def test_ac2_cov_norm_below_threshold_emits() -> None: + # Arrange + ortho, writer = _make_orthorectifier( + thresholds=_make_thresholds(cov_norm_threshold=10.0) + ) + + # Act + result = ortho.try_emit_candidate(**_emit_kwargs()) + + # Assert + assert result is not None + assert len(writer.calls) == 1 + + +# ---------------------------------------------------------------------- +# AC-3 — source-label gate. + + +@pytest.mark.parametrize( + "label", + [PoseSourceLabel.VISUAL_PROPAGATED, PoseSourceLabel.DEAD_RECKONED], +) +def test_ac3_non_satellite_anchored_blocked(label: PoseSourceLabel) -> None: + # Arrange + ortho, writer = _make_orthorectifier() + out = _output(source_label=label) + + # Act + result = ortho.try_emit_candidate( + **_emit_kwargs(output=out, source_label=label) + ) + + # Assert + assert result is None + assert writer.calls == [] + + +# ---------------------------------------------------------------------- +# AC-4 — once-per-frame rate limit. + + +def test_ac4_same_frame_id_processed_only_once() -> None: + # Arrange + ortho, writer = _make_orthorectifier() + frame = _frame(frame_id=42) + + # Act + first = ortho.try_emit_candidate(**_emit_kwargs(frame=frame)) + second = ortho.try_emit_candidate(**_emit_kwargs(frame=frame)) + + # Assert + assert first is not None + assert second is None + assert len(writer.calls) == 1 + + +def test_ac4_distinct_frame_ids_each_emit() -> None: + # Arrange + ortho, writer = _make_orthorectifier() + + # Act + ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=1))) + ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=2))) + + # Assert + assert len(writer.calls) == 2 + + +# ---------------------------------------------------------------------- +# AC-8 — defensive missing-input skip. + + +@pytest.mark.parametrize( + "missing", + ["frame", "pose_estimate", "cov_6x6"], +) +def test_ac8_missing_inputs_silent_return_none(missing: str) -> None: + # Arrange + ortho, writer = _make_orthorectifier() + kwargs = _emit_kwargs() + kwargs[missing] = None + + # Act + result = ortho.try_emit_candidate(**kwargs) + + # Assert + assert result is None + assert writer.calls == [] + + +# ---------------------------------------------------------------------- +# AC-5 / AC-6 — writer is called with the right metadata + SHA256. + + +def test_ac5_writer_called_with_onboard_ingest_metadata() -> None: + # Arrange + ortho, writer = _make_orthorectifier() + out = _output(lat=50.45, lon=30.52, alt_m=100.0) + + # Act + ortho.try_emit_candidate(**_emit_kwargs(output=out)) + + # Assert — writer received the canonical onboard-ingest payload. + assert len(writer.calls) == 1 + call = writer.calls[0] + assert call["zoom_level"] == 18 + assert call["lat"] == pytest.approx(50.45) + assert call["lon"] == pytest.approx(30.52) + assert call["tile_size_meters"] == pytest.approx(100.0) + assert call["tile_size_pixels"] == 64 + assert call["flight_id"] == _FLIGHT_ID + assert call["companion_id"] == _COMPANION_ID + assert call["estimator_label"] == _ESTIMATOR_LABEL + assert call["last_anchor_age_ms"] == 50 + assert call["mre_px"] == pytest.approx(1.5) + assert call["imu_bias_norm"] == pytest.approx(0.01) + cov2x2 = call["covariance_2x2_horizontal"] + assert cov2x2[0][0] == pytest.approx(0.01) + assert cov2x2[1][1] == pytest.approx(0.01) + + +def test_ac6_jpeg_bytes_decode_to_expected_shape_and_quality() -> None: + # Arrange + ortho, writer = _make_orthorectifier( + thresholds=_make_thresholds(tile_size_pixels=128, jpeg_quality=80) + ) + + # Act + ortho.try_emit_candidate(**_emit_kwargs()) + + # Assert — emitted blob is a JPEG that decodes to the configured tile size. + assert len(writer.calls) == 1 + jpeg_bytes = writer.calls[0]["jpeg_bytes"] + assert isinstance(jpeg_bytes, bytes) + assert len(jpeg_bytes) > 0 + assert jpeg_bytes[:3] == b"\xff\xd8\xff" # JPEG SOI marker + decoded = cv2.imdecode(np.frombuffer(jpeg_bytes, dtype=np.uint8), cv2.IMREAD_UNCHANGED) + assert decoded is not None + assert decoded.shape[:2] == (128, 128) + + +# ---------------------------------------------------------------------- +# AC-9 — writer-side freshness rejection swallowed. + + +def test_ac9_writer_returning_none_swallowed() -> None: + # Arrange + writer = _RecordingWriter(return_override=None) + ortho, _ = _make_orthorectifier(writer=writer) + + # Act + result = ortho.try_emit_candidate(**_emit_kwargs()) + + # Assert + assert result is None + assert len(writer.calls) == 1 # the writer WAS called + + +def test_ac9_writer_raising_swallowed_with_warning(caplog: pytest.LogCaptureFixture) -> None: + # Arrange + writer = _RecordingWriter(side_effect=RuntimeError("infra blew up")) + ortho, _ = _make_orthorectifier(writer=writer) + + # Act + with caplog.at_level(logging.WARNING, logger="gps_denied_onboard.c5_state.orthorectifier"): + result = ortho.try_emit_candidate(**_emit_kwargs()) + + # Assert + assert result is None + assert any( + rec.message == "c5.state.orthorectifier_writer_failed" for rec in caplog.records + ) + + +# ---------------------------------------------------------------------- +# AC-7 — first-emission INFO log. + + +def test_ac7_first_emission_logs_info_subsequent_logs_debug( + caplog: pytest.LogCaptureFixture, +) -> None: + # Arrange + ortho, _ = _make_orthorectifier() + + # Act + with caplog.at_level(logging.INFO, logger="gps_denied_onboard.c5_state.orthorectifier"): + ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=1))) + ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=2))) + + # Assert + info_records = [r for r in caplog.records if r.levelno == logging.INFO] + assert any(r.message == "c5.state.first_mid_flight_candidate" for r in info_records) + assert sum(r.message == "c5.state.first_mid_flight_candidate" for r in info_records) == 1 + + +# ---------------------------------------------------------------------- +# Inlier-floor gate (specified alongside AC-2 in the task body). + + +def test_inlier_floor_blocks_emission() -> None: + # Arrange + ortho, writer = _make_orthorectifier( + thresholds=_make_thresholds(inlier_floor=50) + ) + + # Act — inlier_count == floor is rejected (strict inequality). + result = ortho.try_emit_candidate(**_emit_kwargs(inlier_count=50)) + + # Assert + assert result is None + assert writer.calls == [] + + +# ---------------------------------------------------------------------- +# AC-1 — orthorectification correctness (geometry). + + +def test_ac1_homography_projects_origin_to_principal_point() -> None: + """Camera straight down at altitude h, K = [fx 0 cx; 0 fy cy; 0 0 1]. + + Ground point (X=0, Y=0) lies under the camera. With the body- + looking-down convention (180° about world +X), the + ``world_T_cam = world_T_body`` for ``body_T_cam = I``: + + * Camera position in world = (0, 0, h) + * Camera Z axis points along world -Z (hits ground at Z=0) + + The expected projection of ground (0, 0, 0) is the principal + point ``(cx, cy)`` to within sub-pixel tolerance. + """ + # Arrange + cal = _nadir_calibration() + K = np.asarray(cal.intrinsics_3x3, dtype=np.float64) + body_T_cam = np.asarray(cal.body_to_camera_se3, dtype=np.float64) + out = _output(alt_m=100.0) + R = _quat_to_rotation_matrix(out.orientation_world_T_body) + world_T_body = np.eye(4, dtype=np.float64) + world_T_body[:3, :3] = R + world_T_body[2, 3] = float(out.position_wgs84.alt_m) + world_T_cam = world_T_body @ body_T_cam + cam_T_world = _invert_se3(world_T_cam) + H = _ground_plane_homography(K=K, cam_T_world=cam_T_world) + + # Act — project ground origin (X=0, Y=0, 1). + homog = H @ np.array([0.0, 0.0, 1.0]) + pix = homog[:2] / homog[2] + + # Assert + assert pix == pytest.approx(np.array([200.0, 200.0]), abs=1.0) + + +def test_ac1_homography_projects_offset_within_one_pixel() -> None: + """Ground (X=10 m, Y=0) at altitude 100 m with fx=500 should land at cx + 50.""" + # Arrange + cal = _nadir_calibration() + K = np.asarray(cal.intrinsics_3x3, dtype=np.float64) + body_T_cam = np.asarray(cal.body_to_camera_se3, dtype=np.float64) + out = _output(alt_m=100.0) + R = _quat_to_rotation_matrix(out.orientation_world_T_body) + world_T_body = np.eye(4, dtype=np.float64) + world_T_body[:3, :3] = R + world_T_body[2, 3] = float(out.position_wgs84.alt_m) + world_T_cam = world_T_body @ body_T_cam + cam_T_world = _invert_se3(world_T_cam) + H = _ground_plane_homography(K=K, cam_T_world=cam_T_world) + + # Act + homog = H @ np.array([10.0, 0.0, 1.0]) + pix = homog[:2] / homog[2] + + # Assert — fx=500, h=100 → 10 m on the ground projects to 50 px from cx. + # The 180° flip about world +X sends body +Y to world -Y so a +X ground + # offset stays at +X in image space (cx + 50) on the row of the + # principal point. + assert pix[0] == pytest.approx(250.0, abs=1.0) + assert pix[1] == pytest.approx(200.0, abs=1.0) + + +def test_compose_tile_to_image_homography_is_centred_on_camera() -> None: + """Tile pixel ``(N/2, N/2)`` corresponds to the camera nadir on the ground.""" + # Arrange + cal = _nadir_calibration() + K = np.asarray(cal.intrinsics_3x3, dtype=np.float64) + body_T_cam = np.asarray(cal.body_to_camera_se3, dtype=np.float64) + out = _output(alt_m=100.0) + R = _quat_to_rotation_matrix(out.orientation_world_T_body) + world_T_body = np.eye(4, dtype=np.float64) + world_T_body[:3, :3] = R + world_T_body[2, 3] = float(out.position_wgs84.alt_m) + world_T_cam = world_T_body @ body_T_cam + cam_T_world = _invert_se3(world_T_cam) + H_g2i = _ground_plane_homography(K=K, cam_T_world=cam_T_world) + N = 64 + H_t2i = _compose_tile_to_image_homography( + H_ground_to_image=H_g2i, + ground_x_center=float(world_T_cam[0, 3]), + ground_y_center=float(world_T_cam[1, 3]), + tile_size_meters=100.0, + tile_size_pixels=N, + ) + + # Act — middle of the tile maps to the principal point of the camera image. + homog = H_t2i @ np.array([N / 2.0, N / 2.0, 1.0]) + pix = homog[:2] / homog[2] + + # Assert + assert pix == pytest.approx(np.array([200.0, 200.0]), abs=1.0) + + +# ---------------------------------------------------------------------- +# Composition-root adapter — _C6MidFlightIngestAdapter. + + +def test_adapter_translates_freshness_rejection_to_none() -> None: + """The adapter catches :class:`FreshnessRejectionError` and returns ``None`` so the orthorectifier silently drops the candidate.""" + # Arrange + from gps_denied_onboard.components.c6_tile_cache.errors import ( + FreshnessRejectionError, + ) + from gps_denied_onboard.runtime_root.state_factory import ( + _C6MidFlightIngestAdapter, + ) + + fake_store = mock.MagicMock() + fake_store.write_tile.side_effect = FreshnessRejectionError( + "rear_sector stale" + ) + adapter = _C6MidFlightIngestAdapter(tile_store=fake_store) + + # Act + result = adapter.write_mid_flight_tile( + jpeg_bytes=b"\xff\xd8\xff\xe0fakejpeg", + zoom_level=18, + lat=50.45, + lon=30.52, + tile_size_meters=100.0, + tile_size_pixels=64, + capture_timestamp=datetime(2026, 5, 16, 12, 0, 0, tzinfo=timezone.utc), + flight_id=_FLIGHT_ID, + companion_id=_COMPANION_ID, + estimator_label=_ESTIMATOR_LABEL, + covariance_2x2_horizontal=((0.01, 0.0), (0.0, 0.01)), + last_anchor_age_ms=50, + mre_px=1.5, + imu_bias_norm=0.01, + ) + + # Assert + assert result is None + fake_store.write_tile.assert_called_once() + + +def test_adapter_calls_write_tile_with_onboard_ingest_metadata() -> None: + """On a successful insert the adapter builds an ``ONBOARD_INGEST`` row, sets ``PENDING`` voting status, and computes the SHA256 of the JPEG.""" + # Arrange + from gps_denied_onboard.components.c6_tile_cache._types import ( + FreshnessLabel, + TileSource, + VotingStatus, + ) + from gps_denied_onboard.runtime_root.state_factory import ( + _C6MidFlightIngestAdapter, + ) + + fake_store = mock.MagicMock() + adapter = _C6MidFlightIngestAdapter(tile_store=fake_store) + jpeg_bytes = b"\xff\xd8\xff\xe0testjpegblob" + + # Act + result = adapter.write_mid_flight_tile( + jpeg_bytes=jpeg_bytes, + zoom_level=18, + lat=50.45, + lon=30.52, + tile_size_meters=100.0, + tile_size_pixels=64, + capture_timestamp=datetime(2026, 5, 16, 12, 0, 0, tzinfo=timezone.utc), + flight_id=_FLIGHT_ID, + companion_id=_COMPANION_ID, + estimator_label=_ESTIMATOR_LABEL, + covariance_2x2_horizontal=((0.01, 0.0), (0.0, 0.01)), + last_anchor_age_ms=50, + mre_px=1.5, + imu_bias_norm=0.01, + ) + + # Assert — write_tile invoked with (blob, metadata). + fake_store.write_tile.assert_called_once() + args, kwargs = fake_store.write_tile.call_args + assert kwargs == {} + assert args[0] == jpeg_bytes + metadata = args[1] + assert metadata.source == TileSource.ONBOARD_INGEST + assert metadata.freshness_label == FreshnessLabel.FRESH + assert metadata.voting_status == VotingStatus.PENDING + assert metadata.flight_id == _FLIGHT_ID + assert metadata.companion_id == _COMPANION_ID + assert metadata.tile_id.zoom_level == 18 + assert metadata.tile_id.lat == pytest.approx(50.45) + assert metadata.tile_id.lon == pytest.approx(30.52) + assert metadata.content_sha256_hex == hashlib.sha256(jpeg_bytes).hexdigest() + quality = metadata.quality_metadata + assert quality is not None + assert quality.estimator_label == _ESTIMATOR_LABEL + assert quality.covariance_2x2 == ((0.01, 0.0), (0.0, 0.01)) + assert quality.last_anchor_age_ms == 50 + assert result == (18, pytest.approx(50.45), pytest.approx(30.52)) + + +# ---------------------------------------------------------------------- +# OrthorectifierConfig validation (config block sanity). + + +def test_thresholds_reject_non_positive_cov_norm() -> None: + # Act / Assert + with pytest.raises(ValueError): + OrthorectifierThresholds(cov_norm_threshold=0.0, inlier_floor=10) + + +def test_thresholds_reject_negative_inlier_floor() -> None: + # Act / Assert + with pytest.raises(ValueError): + OrthorectifierThresholds(cov_norm_threshold=1.0, inlier_floor=-1)