[AZ-389] C5 orthorectifier emits mid-flight tiles to C6

Adds an opt-in C5-internal orthorectifier (`_orthorectifier.py`) that
emits at most one tile-aligned JPEG candidate per nav frame to the
C6 `TileStore.write_tile` API.  Quality gates fire before any
OpenCV work: covariance Frobenius, inlier floor, source-label
(`SATELLITE_ANCHORED` only), and once-per-frame rate limit.

Cross-component import rule (AZ-507) is preserved: c5_state never
imports c6_tile_cache.  `runtime_root.state_factory` carries a new
`_C6MidFlightIngestAdapter` that builds the canonical
`TileMetadata` (`ONBOARD_INGEST` / `FRESH` / `PENDING`), hashes
the JPEG, and translates `FreshnessRejectionError` to a `None`
return so the orthorectifier silently swallows freshness
rejection per AC-NEW-3.

Wiring is opt-in via `C5StateConfig.orthorectifier.enabled`;
existing tests/binaries default to disabled and are unaffected.
Both `GtsamIsam2StateEstimator` and `EskfStateEstimator`
participate through new `attach_orthorectifier` /
`set_latest_nav_frame` extension methods (Protocol surface
unchanged).

Tests: 22 new unit tests cover AC-1..AC-9 plus inlier-floor
gate plus the composition-root adapter.  216/216 c5_state and
38/38 runtime-root + compose tests pass.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-16 09:02:33 +03:00
parent 811ddc8aa7
commit c5ffc14fe9
9 changed files with 1952 additions and 20 deletions
@@ -0,0 +1,146 @@
# Batch 65 — Cycle 1 Report
**Date**: 2026-05-16
**Tasks**: AZ-389 (C5 orthorectifier → C6 mid-flight tile candidate emission)
**Verdict**: COMPLETE — PASS (self-reviewed)
## Summary
Closes the AZ-389 gap inside the C5 state estimator by introducing a
component-internal orthorectifier that emits at most one tile-aligned
JPEG candidate per nav-camera frame to C6 via the existing
`TileStore.write_tile` API.
The implementation respects the AZ-507 cross-component import rule
(enforced by `test_az270_compose_root.test_ac6_only_compose_root_imports_concrete_strategies`):
c5_state never imports c6_tile_cache. The composition root's
`runtime_root.state_factory` carries a new
`_C6MidFlightIngestAdapter` that wraps the C6 `TileStore`, builds the
canonical `TileMetadata` (`TileSource.ONBOARD_INGEST`,
`FreshnessLabel.FRESH`, `VotingStatus.PENDING`), hashes the JPEG
bytes, and translates `FreshnessRejectionError` into a `None` return
so the orthorectifier silently swallows freshness rejection per
AC-NEW-3 (opportunistic emission).
The orthorectifier runs entirely on the existing state-ingest thread
(Invariant 1) — no new threads, no additional locks. It is wired
opt-in: `config.components['c5_state'].orthorectifier.enabled = false`
keeps the legacy steady-state path bit-for-bit unchanged. Both
`GtsamIsam2StateEstimator` and `EskfStateEstimator` participate
through new `attach_orthorectifier(...)` and `set_latest_nav_frame(...)`
extension methods (concrete only — the `StateEstimator` Protocol
surface is unchanged so existing implementations and tests continue
to satisfy it).
## Architecture decisions
* **Per-frame, per-estimator hook** — the hook fires after the
EstimatorOutput is built inside `current_estimate()`. The buffered
nav frame supplies the source pixels; the orthorectifier passes
duck-typed pose + cov to its kernel and rate-limits itself to one
tile per `frame.frame_id` (AC-4).
* **No new C6 API** — uses `TileStore.write_tile(blob, metadata)`,
the same atomic file + metadata insert that the C11 download path
already uses. The composition-root adapter is the only new
component-bridge.
* **Quality gates as cheap pre-checks** — covariance Frobenius gate,
inlier-floor gate, source-label gate (only `SATELLITE_ANCHORED`
passes), and once-per-frame rate limit run BEFORE the OpenCV
warp/encode work.
* **Best-effort kernel** — any exception inside the warp / JPEG
encode path or any non-`FreshnessRejectionError` writer failure is
swallowed with a WARNING log and `None` return; the steady-state
`current_estimate` output is never disturbed.
* **AC-7 first-emission INFO log** — emitted exactly once per
flight, subsequent emissions log at DEBUG.
## Files added / modified
### Added (2)
- `src/gps_denied_onboard/components/c5_state/_orthorectifier.py`
the `MidFlightTileWriter` Protocol cut, `OrthorectifierThresholds`
dataclass, and `Orthorectifier` class with the homography
construction (`_ground_plane_homography`,
`_compose_tile_to_image_homography`, `_invert_se3`,
`_quat_to_rotation_matrix`).
- `tests/unit/c5_state/test_az389_orthorectifier.py` — 22 tests
covering AC-1..AC-9 plus the inlier-floor gate plus the
composition-root `_C6MidFlightIngestAdapter` translation rules
plus `OrthorectifierConfig` validation.
### Modified (4)
- `src/gps_denied_onboard/components/c5_state/config.py` — new
`OrthorectifierConfig` dataclass nested as
`C5StateConfig.orthorectifier`. Disabled by default; tunable
thresholds + tile / zoom / JPEG knobs.
- `src/gps_denied_onboard/components/c5_state/gtsam_isam2_estimator.py`
— orthorectifier state fields, `attach_orthorectifier` +
`set_latest_nav_frame` extension methods, `_maybe_emit_mid_flight_tile`
hook in `current_estimate()`, and `create()` factory now accepts
the optional `mid_flight_tile_writer` / `camera_calibration` /
`flight_id` / `companion_id` params.
- `src/gps_denied_onboard/components/c5_state/eskf_baseline.py`
same set of changes, plus `_latest_vio` cache (ESKF historically
did not retain the full VIO DTO).
- `src/gps_denied_onboard/runtime_root/state_factory.py`
`_C6MidFlightIngestAdapter` class + `build_state_estimator` now
accepts optional `tile_store` / `camera_calibration` /
`flight_id` / `companion_id` and forwards them to the strategy
factory when AZ-389 is enabled.
## Task Results
| Task | Status | Files Modified | Focused tests | AC Coverage | Issues |
|--------|--------|---------------------------------------------------------------------------------------------------------------------------------|---------------|--------------|--------|
| AZ-389 | Done | 1 added + 4 modified under `src/`; 1 added under `tests/unit/c5_state/`; task spec moved `_docs/02_tasks/todo/``done/` | 22/22 pass | 9/9 covered | None |
## AC Test Coverage: 9/9 covered
| AC | Test | Status |
|------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| AC-1 | `test_ac1_homography_projects_origin_to_principal_point` + `test_ac1_homography_projects_offset_within_one_pixel` + `test_compose_tile_to_image_homography_is_centred_on_camera` | Covered |
| AC-2 | `test_ac2_cov_norm_above_threshold_blocks_emission` + `test_ac2_cov_norm_below_threshold_emits` | Covered |
| AC-3 | `test_ac3_non_satellite_anchored_blocked` (parametrised over `VISUAL_PROPAGATED` / `DEAD_RECKONED`) | Covered |
| AC-4 | `test_ac4_same_frame_id_processed_only_once` + `test_ac4_distinct_frame_ids_each_emit` | Covered |
| AC-5 | `test_ac5_writer_called_with_onboard_ingest_metadata` + `test_adapter_calls_write_tile_with_onboard_ingest_metadata` | Covered |
| AC-6 | `test_ac6_jpeg_bytes_decode_to_expected_shape_and_quality` + adapter-side `content_sha256_hex == hashlib.sha256(jpeg_bytes).hexdigest()` assertion | Covered |
| AC-7 | `test_ac7_first_emission_logs_info_subsequent_logs_debug` | Covered |
| AC-8 | `test_ac8_missing_inputs_silent_return_none` (parametrised over `frame` / `pose_estimate` / `cov_6x6`) | Covered |
| AC-9 | `test_ac9_writer_returning_none_swallowed` + `test_ac9_writer_raising_swallowed_with_warning` + `test_adapter_translates_freshness_rejection_to_none` | Covered |
## Code Review Verdict: PASS (self-reviewed)
## Auto-Fix Attempts: 0
## Stuck Agents: None
## Cross-batch verification
- `tests/unit/c5_state/` — 216 / 216 pass (all pre-AZ-389 tests still
pass — Protocol surface unchanged, factory signature is
backward-compatible via default `None` params).
- `tests/unit/test_az270_compose_root.py` — 8 / 8 pass; the
cross-component import lint still holds against the new
`_C6MidFlightIngestAdapter`.
- `tests/unit/test_runtime_root_env_gate.py` +
`tests/unit/test_az401_compose_root_replay.py` +
`tests/unit/test_ac3_compose_files.py` — 38 / 38 pass.
- `tests/unit/c6_tile_cache/` — 126 / 126 in-process tests pass
(Postgres-backed tests skipped; require Docker).
## Notes / leftovers
- The Jira description for AZ-389 still references the
pre-AZ-559 `tile_store.put_mid_flight_candidate` API surface; the
local task spec was rewritten against `write_tile` per the
History section. Logged as tracker hygiene; not blocking.
- During investigation of the existing C11 download adapter
(`runtime_root/c11_factory.py::_C6DownloadAdapter.write_tile_for_download`)
we noticed it calls both `tile_store.write_tile(blob, metadata)`
and `metadata_store.insert_metadata(metadata)` sequentially —
given that `PostgresFilesystemStore.write_tile` is itself atomic
(file write + metadata insert in a single transaction) the second
call is a probable redundancy. Out of scope for AZ-389; recorded
here for a future hygiene ticket.
## Next Batch: All product-implementation tasks complete — proceed to Step 15 (Product Implementation Completeness Gate).
+5 -5
View File
@@ -6,13 +6,13 @@ step: 7
name: Implement
status: in_progress
sub_step:
phase: 1
name: parse
detail: ""
phase: 11
name: commit
detail: "Batch 65 (AZ-389) committed; awaiting tracker transition + completeness gate"
retry_count: 0
cycle: 1
tracker: jira
last_completed_batch: 64
last_completed_batch: 65
last_cumulative_review: batches_61-63
current_batch: 65
current_batch_tasks: ""
current_batch_tasks: "AZ-389"
@@ -0,0 +1,598 @@
"""C5 internal orthorectifier sub-component (AZ-389).
Per epic spec § Scope: "orthorectifier (lives within C5 as an internal
subcomponent)". Receives a converged pose + nav-camera frame, gates on
quality thresholds (covariance, inlier count, source label), produces a
JPEG-encoded tile-aligned image patch via OpenCV ``warpPerspective``,
and persists it to C6 as an ``ONBOARD_INGEST`` tile via a
constructor-injected :class:`MidFlightTileWriter`.
Cross-component decoupling — c5_state cannot import from c6_tile_cache
(AZ-507; enforced by
``test_az270_compose_root.test_ac6_only_compose_root_imports_concrete_strategies``).
The :class:`MidFlightTileWriter` Protocol below is the consumer-side
cut; the runtime root supplies the concrete adapter via
``state_factory``. The writer translates a C6 ``FreshnessRejectionError``
into a ``None`` return — the orthorectifier swallows that silently
per AC-NEW-3 (opportunistic emission).
NOT exported from ``c5_state.__init__``; constructed inside the
estimator factory and observed only through the estimator's
``set_latest_nav_frame`` / ``current_estimate`` hook pair.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Final, Protocol, runtime_checkable
import cv2
import numpy as np
from gps_denied_onboard._types.state import PoseSourceLabel
if TYPE_CHECKING:
from datetime import datetime
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.nav import NavCameraFrame
__all__ = [
"MidFlightTileWriter",
"Orthorectifier",
"OrthorectifierThresholds",
]
_DEFAULT_JPEG_QUALITY: Final[int] = 90
_DEFAULT_TILE_SIZE_PIXELS: Final[int] = 256
_DEFAULT_TILE_SIZE_METERS: Final[float] = 100.0
_DEFAULT_ZOOM_LEVEL: Final[int] = 18
_LOG = logging.getLogger("gps_denied_onboard.c5_state.orthorectifier")
@runtime_checkable
class MidFlightTileWriter(Protocol):
"""Consumer-side cut of the C6 mid-flight tile write surface.
The concrete adapter lives in
``gps_denied_onboard.runtime_root.state_factory`` and is the only
place that knows about C6's :class:`TileMetadata` / :class:`TileSource`
/ :class:`FreshnessLabel` / :class:`VotingStatus` /
:class:`FreshnessRejectionError`. C5's orthorectifier never imports
them (AZ-507 architectural rule).
Returning ``None`` indicates the C6 freshness gate rejected the
insert. Per AC-NEW-3 the orthorectifier swallows that silently —
mid-flight tile generation is opportunistic. Returning the
``(zoom_level, lat, lon)`` tuple confirms persistence and is the
tile's spatial identity.
"""
def write_mid_flight_tile(
self,
*,
jpeg_bytes: bytes,
zoom_level: int,
lat: float,
lon: float,
tile_size_meters: float,
tile_size_pixels: int,
capture_timestamp: "datetime",
flight_id: str,
companion_id: str,
estimator_label: str,
covariance_2x2_horizontal: tuple[
tuple[float, float], tuple[float, float]
],
last_anchor_age_ms: int,
mre_px: float,
imu_bias_norm: float,
) -> tuple[int, float, float] | None: ...
@dataclass(frozen=True, slots=True)
class OrthorectifierThresholds:
"""Quality + output-grid configuration for :class:`Orthorectifier`.
``cov_norm_threshold`` is the Frobenius-norm gate on the C5 6x6
pose covariance. ``inlier_floor`` is the strict-greater-than
threshold on the C4 reported inlier count. ``tile_size_meters``
and ``tile_size_pixels`` describe the orthorectified output patch
grid; ``zoom_level`` is the satellite-tile zoom the C6 row will be
indexed at. ``jpeg_quality`` is the OpenCV ``cv2.imencode``
quality knob (0-100; default 90 to balance fidelity and budget).
"""
cov_norm_threshold: float
inlier_floor: int
tile_size_meters: float = _DEFAULT_TILE_SIZE_METERS
tile_size_pixels: int = _DEFAULT_TILE_SIZE_PIXELS
zoom_level: int = _DEFAULT_ZOOM_LEVEL
jpeg_quality: int = _DEFAULT_JPEG_QUALITY
def __post_init__(self) -> None:
if not self.cov_norm_threshold > 0.0:
raise ValueError(
f"OrthorectifierThresholds.cov_norm_threshold must be > 0; "
f"got {self.cov_norm_threshold}"
)
if self.inlier_floor < 0:
raise ValueError(
f"OrthorectifierThresholds.inlier_floor must be >= 0; "
f"got {self.inlier_floor}"
)
if not self.tile_size_meters > 0.0:
raise ValueError(
f"OrthorectifierThresholds.tile_size_meters must be > 0; "
f"got {self.tile_size_meters}"
)
if self.tile_size_pixels <= 0:
raise ValueError(
f"OrthorectifierThresholds.tile_size_pixels must be > 0; "
f"got {self.tile_size_pixels}"
)
if not 0 <= self.zoom_level <= 21:
raise ValueError(
f"OrthorectifierThresholds.zoom_level must be in [0, 21]; "
f"got {self.zoom_level}"
)
if not 0 <= self.jpeg_quality <= 100:
raise ValueError(
f"OrthorectifierThresholds.jpeg_quality must be in [0, 100]; "
f"got {self.jpeg_quality}"
)
class Orthorectifier:
"""C5-internal sub-component — emits at most one mid-flight tile per frame.
All logic is single-threaded by construction (state-ingest thread per
Invariant 1). The orthorectifier is opaque to test wiring: tests
construct it with a fake :class:`MidFlightTileWriter` and a synthetic
:class:`CameraCalibration`, then call :meth:`try_emit_candidate`
directly with crafted ``(frame, pose, cov_6x6, ...)`` tuples.
Quality gates (AC-2 / AC-3 / AC-4 / AC-8) run before any heavy
OpenCV work. The orthorectification kernel (AC-1) computes the
ground→image plane homography from the pose + calibration and
delegates sampling to ``cv2.warpPerspective``.
Per AC-NEW-3 (replay protocol Invariant 12) the emission is
opportunistic — a freshness rejection from C6 is swallowed (AC-9)
and a DEBUG log emitted. Hard failures inside the OpenCV kernel
are also swallowed and logged at WARNING (the steady-state
estimate is the priority; mid-flight tile gen is best-effort).
"""
__slots__ = (
"_tile_writer",
"_thresholds",
"_calibration",
"_flight_id",
"_companion_id",
"_first_emission_done",
"_last_processed_frame_id",
)
def __init__(
self,
*,
tile_writer: MidFlightTileWriter,
thresholds: OrthorectifierThresholds,
camera_calibration: "CameraCalibration",
flight_id: str,
companion_id: str,
) -> None:
self._tile_writer = tile_writer
self._thresholds = thresholds
self._calibration = camera_calibration
self._flight_id = flight_id
self._companion_id = companion_id
self._first_emission_done: bool = False
self._last_processed_frame_id: int | None = None
def try_emit_candidate(
self,
*,
frame: "NavCameraFrame | None",
pose_estimate: Any,
cov_6x6: np.ndarray | None,
inlier_count: int,
mre_px: float,
source_label: PoseSourceLabel,
last_anchor_age_ms: int,
imu_bias_norm: float,
estimator_label: str,
) -> tuple[int, float, float] | None:
"""Run quality gates → orthorectify → write. Returns persisted ``(zoom, lat, lon)`` or ``None``.
Returns ``None`` (silently) when ANY of the following hold:
* AC-8 — ``frame`` or ``pose_estimate`` or ``cov_6x6`` is ``None``;
* AC-4 — ``frame.frame_id`` was already processed in a prior call;
* AC-3 — ``source_label != SATELLITE_ANCHORED``;
* AC-2 — Frobenius norm of ``cov_6x6`` >= ``cov_norm_threshold``;
* inlier-floor — ``inlier_count <= inlier_floor``;
* AC-9 — :class:`MidFlightTileWriter` returned ``None``
(freshness rejection — DEBUG log only);
* orthorectification kernel raised (WARNING log; not propagated).
"""
if frame is None or pose_estimate is None or cov_6x6 is None:
_LOG.debug(
"c5.state.orthorectifier_skip_missing_input",
extra={
"kind": "c5.state.orthorectifier_skip_missing_input",
"kv": {
"frame_present": frame is not None,
"pose_present": pose_estimate is not None,
"cov_present": cov_6x6 is not None,
},
},
)
return None
if (
self._last_processed_frame_id is not None
and self._last_processed_frame_id == int(frame.frame_id)
):
return None
self._last_processed_frame_id = int(frame.frame_id)
if source_label != PoseSourceLabel.SATELLITE_ANCHORED:
_LOG.debug(
"c5.state.orthorectifier_skip_source_label",
extra={
"kind": "c5.state.orthorectifier_skip_source_label",
"kv": {
"frame_id": int(frame.frame_id),
"source_label": source_label.value,
},
},
)
return None
cov_norm = float(np.linalg.norm(np.asarray(cov_6x6), ord="fro"))
if cov_norm >= self._thresholds.cov_norm_threshold:
_LOG.debug(
"c5.state.orthorectifier_skip_cov_gate",
extra={
"kind": "c5.state.orthorectifier_skip_cov_gate",
"kv": {
"frame_id": int(frame.frame_id),
"cov_norm": cov_norm,
"threshold": self._thresholds.cov_norm_threshold,
},
},
)
return None
if inlier_count <= self._thresholds.inlier_floor:
_LOG.debug(
"c5.state.orthorectifier_skip_inlier_gate",
extra={
"kind": "c5.state.orthorectifier_skip_inlier_gate",
"kv": {
"frame_id": int(frame.frame_id),
"inlier_count": int(inlier_count),
"floor": self._thresholds.inlier_floor,
},
},
)
return None
try:
jpeg_bytes = self._orthorectify_to_jpeg(frame, pose_estimate)
except Exception as exc:
# AC-NEW-3 opportunistic emission — log + return; do not
# propagate to the C5 estimator's caller.
_LOG.warning(
"c5.state.orthorectifier_kernel_failed",
extra={
"kind": "c5.state.orthorectifier_kernel_failed",
"kv": {
"frame_id": int(frame.frame_id),
"error": str(exc),
"error_type": type(exc).__name__,
},
},
)
return None
if jpeg_bytes is None or len(jpeg_bytes) == 0:
_LOG.warning(
"c5.state.orthorectifier_empty_jpeg",
extra={
"kind": "c5.state.orthorectifier_empty_jpeg",
"kv": {"frame_id": int(frame.frame_id)},
},
)
return None
cov_2x2_h = self._extract_horizontal_covariance(np.asarray(cov_6x6))
try:
result = self._tile_writer.write_mid_flight_tile(
jpeg_bytes=jpeg_bytes,
zoom_level=int(self._thresholds.zoom_level),
lat=float(pose_estimate.position_wgs84.lat_deg),
lon=float(pose_estimate.position_wgs84.lon_deg),
tile_size_meters=float(self._thresholds.tile_size_meters),
tile_size_pixels=int(self._thresholds.tile_size_pixels),
capture_timestamp=frame.timestamp,
flight_id=self._flight_id,
companion_id=self._companion_id,
estimator_label=estimator_label,
covariance_2x2_horizontal=cov_2x2_h,
last_anchor_age_ms=int(last_anchor_age_ms),
mre_px=float(mre_px),
imu_bias_norm=float(imu_bias_norm),
)
except Exception as exc:
# The writer is responsible for catching the C6
# FreshnessRejectionError family and translating it to None.
# Anything else escaping is an infrastructure failure — log
# at WARNING but do not propagate (mid-flight tile gen is
# best-effort and must never break the steady-state
# current_estimate path).
_LOG.warning(
"c5.state.orthorectifier_writer_failed",
extra={
"kind": "c5.state.orthorectifier_writer_failed",
"kv": {
"frame_id": int(frame.frame_id),
"error": str(exc),
"error_type": type(exc).__name__,
},
},
)
return None
if result is None:
_LOG.debug(
"c5.state.mid_flight_candidate_freshness_rejected",
extra={
"kind": "c5.state.mid_flight_candidate_freshness_rejected",
"kv": {
"frame_id": int(frame.frame_id),
"lat": float(pose_estimate.position_wgs84.lat_deg),
"lon": float(pose_estimate.position_wgs84.lon_deg),
},
},
)
return None
if not self._first_emission_done:
self._first_emission_done = True
_LOG.info(
"c5.state.first_mid_flight_candidate",
extra={
"kind": "c5.state.first_mid_flight_candidate",
"kv": {
"frame_id": int(frame.frame_id),
"tile_id": list(result),
"cov_norm": cov_norm,
},
},
)
else:
_LOG.debug(
"c5.state.mid_flight_candidate_emitted",
extra={
"kind": "c5.state.mid_flight_candidate_emitted",
"kv": {
"frame_id": int(frame.frame_id),
"tile_id": list(result),
"cov_norm": cov_norm,
"inlier_count": int(inlier_count),
},
},
)
return result
def _orthorectify_to_jpeg(
self, frame: "NavCameraFrame", pose: Any
) -> bytes:
image = self._coerce_image(frame.image)
K = self._coerce_intrinsics(self._calibration.intrinsics_3x3)
body_T_cam = self._coerce_se3(self._calibration.body_to_camera_se3)
world_T_body = self._world_T_body_from_pose(pose)
world_T_cam = world_T_body @ body_T_cam
cam_T_world = _invert_se3(world_T_cam)
H_ground_to_image = _ground_plane_homography(K=K, cam_T_world=cam_T_world)
H_tile_to_image = _compose_tile_to_image_homography(
H_ground_to_image=H_ground_to_image,
ground_x_center=float(world_T_cam[0, 3]),
ground_y_center=float(world_T_cam[1, 3]),
tile_size_meters=float(self._thresholds.tile_size_meters),
tile_size_pixels=int(self._thresholds.tile_size_pixels),
)
N = int(self._thresholds.tile_size_pixels)
warped = cv2.warpPerspective(
image,
H_tile_to_image,
(N, N),
flags=cv2.INTER_LINEAR | cv2.WARP_INVERSE_MAP,
borderMode=cv2.BORDER_CONSTANT,
borderValue=0,
)
encode_params = [
int(cv2.IMWRITE_JPEG_QUALITY),
int(self._thresholds.jpeg_quality),
]
success, buffer = cv2.imencode(".jpg", warped, encode_params)
if not success:
raise RuntimeError("cv2.imencode(.jpg) returned failure")
return bytes(buffer.tobytes())
@staticmethod
def _extract_horizontal_covariance(
cov_6x6: np.ndarray,
) -> tuple[tuple[float, float], tuple[float, float]]:
"""Pull the horizontal-translation 2x2 sub-block.
The C5 6x6 covariance is documented as rotation-then-translation
(rxx, ryy, rzz, txx, tyy, tzz) in the GTSAM tangent-space order
(matching ``VioOutput.pose_covariance_6x6``). Indices [3, 4] are
the East / North translation components — the horizontal sub-block
the C6 voting layer consumes (per AZ-389 § Constraints).
"""
if cov_6x6.shape != (6, 6):
raise ValueError(
f"cov_6x6 must be shape (6, 6); got {cov_6x6.shape}"
)
sub = cov_6x6[3:5, 3:5]
return (
(float(sub[0, 0]), float(sub[0, 1])),
(float(sub[1, 0]), float(sub[1, 1])),
)
@staticmethod
def _coerce_image(image: Any) -> np.ndarray:
arr = np.asarray(image)
if arr.ndim not in (2, 3):
raise ValueError(
f"NavCameraFrame.image must be 2-D (grayscale) or 3-D "
f"(H, W, C); got ndim={arr.ndim}"
)
return arr
@staticmethod
def _coerce_intrinsics(k: Any) -> np.ndarray:
arr = np.asarray(k, dtype=np.float64)
if arr.shape != (3, 3):
raise ValueError(
f"intrinsics_3x3 must be shape (3, 3); got {arr.shape}"
)
return arr
@staticmethod
def _coerce_se3(se3: Any) -> np.ndarray:
arr = np.asarray(se3, dtype=np.float64)
if arr.shape == (4, 4):
return arr
# Some helper SE3 wrappers expose ``.matrix()``; try that.
if hasattr(se3, "matrix"):
arr = np.asarray(se3.matrix(), dtype=np.float64)
if arr.shape == (4, 4):
return arr
raise ValueError(
f"body_to_camera_se3 must be a 4x4 array or expose .matrix(); "
f"got shape={arr.shape}"
)
@staticmethod
def _world_T_body_from_pose(pose: Any) -> np.ndarray:
"""Build a 4x4 homogeneous ``world_T_body`` from a :class:`PoseEstimate`.
The pose's ``position_wgs84`` is collapsed to the local-ENU
plane anchored at the camera's own lat/lon — i.e. the camera
is at world-origin in the orthorectifier's ENU frame. The
ground plane is at ``Z = -altitude`` so the homography sees a
non-degenerate plane separation. This keeps the tile-relative
math agnostic to the global ENU origin and avoids re-injecting
the ``WgsConverter`` at this layer; the persisted tile_id lat
/ lon comes directly from the pose's ``position_wgs84``.
"""
R = _quat_to_rotation_matrix(pose.orientation_world_T_body)
T = np.eye(4, dtype=np.float64)
T[:3, :3] = R
# Camera body translation: at world origin in the local-ENU
# working frame (East=0, North=0). Altitude-above-ground is
# the only dimension that matters for the homography.
T[0, 3] = 0.0
T[1, 3] = 0.0
T[2, 3] = float(pose.position_wgs84.alt_m)
return T
def _quat_to_rotation_matrix(q: Any) -> np.ndarray:
"""Convert a scalar-first :class:`Quat` ``(w, x, y, z)`` to a 3x3 rotation."""
w = float(q.w)
x = float(q.x)
y = float(q.y)
z = float(q.z)
norm = (w * w + x * x + y * y + z * z) ** 0.5
if norm <= 0.0:
raise ValueError("Quat has zero norm; cannot convert to rotation")
w /= norm
x /= norm
y /= norm
z /= norm
return np.array(
[
[1.0 - 2.0 * (y * y + z * z), 2.0 * (x * y - z * w), 2.0 * (x * z + y * w)],
[2.0 * (x * y + z * w), 1.0 - 2.0 * (x * x + z * z), 2.0 * (y * z - x * w)],
[2.0 * (x * z - y * w), 2.0 * (y * z + x * w), 1.0 - 2.0 * (x * x + y * y)],
],
dtype=np.float64,
)
def _invert_se3(T: np.ndarray) -> np.ndarray:
"""Invert a 4x4 rigid SE(3) transform analytically."""
if T.shape != (4, 4):
raise ValueError(f"_invert_se3: expected (4, 4); got {T.shape}")
R = T[:3, :3]
t = T[:3, 3]
inv = np.eye(4, dtype=np.float64)
inv[:3, :3] = R.T
inv[:3, 3] = -R.T @ t
return inv
def _ground_plane_homography(
*, K: np.ndarray, cam_T_world: np.ndarray
) -> np.ndarray:
"""Return the 3x3 homography mapping ground-plane points ``(X, Y, 1)`` (world ENU, ``Z = 0``) to image pixels ``(u, v, 1)``.
Construction:
.. code:: text
[u] [X]
[v] = K @ [r1 r2 t] @ [Y]
[w] [1]
where ``[r1 | r2 | r3 | t]`` is ``cam_T_world`` and ``r1``,
``r2`` are its first two columns. The third column ``r3`` is
dropped because the ground plane fixes ``Z = 0`` (so the
homography is full-rank in 2D).
"""
R = cam_T_world[:3, :3]
t = cam_T_world[:3, 3]
return K @ np.column_stack([R[:, 0], R[:, 1], t])
def _compose_tile_to_image_homography(
*,
H_ground_to_image: np.ndarray,
ground_x_center: float,
ground_y_center: float,
tile_size_meters: float,
tile_size_pixels: int,
) -> np.ndarray:
"""Build ``H_tile_to_image = H_ground_to_image @ M_tile_to_ground``.
``M_tile_to_ground`` maps tile pixel ``(j, i, 1)`` (image-y-down
convention) to ground ``(X, Y, 1)`` (world Y is North, image i is
rows-down). Centred at ``(ground_x_center, ground_y_center)`` so
the camera's nadir lands at the tile centre.
"""
s = float(tile_size_meters) / float(tile_size_pixels)
n = int(tile_size_pixels)
M = np.array(
[
[s, 0.0, ground_x_center - s * (n / 2.0)],
[0.0, -s, ground_y_center + s * (n / 2.0)],
[0.0, 0.0, 1.0],
],
dtype=np.float64,
)
return H_ground_to_image @ M
@@ -9,12 +9,16 @@ root reads ``config.components["c5_state"]`` and dispatches by
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Final
from gps_denied_onboard.config.schema import ConfigError
__all__ = ["KNOWN_STATE_STRATEGIES", "C5StateConfig"]
__all__ = [
"KNOWN_STATE_STRATEGIES",
"C5StateConfig",
"OrthorectifierConfig",
]
KNOWN_STATE_STRATEGIES: Final[frozenset[str]] = frozenset({"gtsam_isam2", "eskf"})
@@ -23,6 +27,60 @@ _KEYFRAME_WINDOW_MIN: Final[int] = 10
_KEYFRAME_WINDOW_MAX: Final[int] = 20
@dataclass(frozen=True)
class OrthorectifierConfig:
"""AZ-389 orthorectifier knobs (sub-block of :class:`C5StateConfig`).
``enabled`` is the main feature flag. When ``False`` the runtime
root skips orthorectifier construction entirely and the estimator
behaves exactly as before AZ-389; this preserves the existing
smoke-test wiring that does not provide a TileStore. The four
quality / output knobs match
:class:`gps_denied_onboard.components.c5_state._orthorectifier.OrthorectifierThresholds`
one-for-one.
"""
enabled: bool = False
cov_norm_threshold: float = 1.0
inlier_floor: int = 30
tile_size_meters: float = 100.0
tile_size_pixels: int = 256
zoom_level: int = 18
jpeg_quality: int = 90
def __post_init__(self) -> None:
if not self.cov_norm_threshold > 0.0:
raise ConfigError(
"OrthorectifierConfig.cov_norm_threshold must be > 0; "
f"got {self.cov_norm_threshold}"
)
if self.inlier_floor < 0:
raise ConfigError(
"OrthorectifierConfig.inlier_floor must be >= 0; "
f"got {self.inlier_floor}"
)
if not self.tile_size_meters > 0.0:
raise ConfigError(
"OrthorectifierConfig.tile_size_meters must be > 0; "
f"got {self.tile_size_meters}"
)
if self.tile_size_pixels <= 0:
raise ConfigError(
"OrthorectifierConfig.tile_size_pixels must be > 0; "
f"got {self.tile_size_pixels}"
)
if not 0 <= self.zoom_level <= 21:
raise ConfigError(
"OrthorectifierConfig.zoom_level must be in [0, 21]; "
f"got {self.zoom_level}"
)
if not 0 <= self.jpeg_quality <= 100:
raise ConfigError(
"OrthorectifierConfig.jpeg_quality must be in [0, 100]; "
f"got {self.jpeg_quality}"
)
@dataclass(frozen=True)
class C5StateConfig:
"""C5 state-estimator config block.
@@ -58,6 +116,7 @@ class C5StateConfig:
default_takeoff_origin_sigma_horiz_m: float = 5.0
default_takeoff_origin_sigma_vert_m: float = 10.0
no_estimate_fallback_s: float = 3.0
orthorectifier: OrthorectifierConfig = field(default_factory=OrthorectifierConfig)
def __post_init__(self) -> None:
if self.strategy not in KNOWN_STATE_STRATEGIES:
@@ -75,6 +75,11 @@ from gps_denied_onboard.components.c5_state._source_label_sm import (
RejectionSubscription,
SourceLabelStateMachine,
)
from gps_denied_onboard.components.c5_state._orthorectifier import (
MidFlightTileWriter,
Orthorectifier,
OrthorectifierThresholds,
)
from gps_denied_onboard.components.c5_state.config import C5StateConfig
from gps_denied_onboard.components.c5_state.errors import (
EstimatorAlreadyStartedError,
@@ -88,8 +93,9 @@ from gps_denied_onboard.helpers.wgs_converter import WgsConverter
from gps_denied_onboard.logging import get_logger
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.fc import GpsHealth, GpsSample
from gps_denied_onboard._types.nav import ImuWindow, VioOutput
from gps_denied_onboard._types.nav import ImuWindow, NavCameraFrame, VioOutput
from gps_denied_onboard._types.pose import PoseEstimate
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.config import Config
@@ -229,6 +235,17 @@ class EskfStateEstimator(StateEstimator):
producer_id="c5_state",
)
# AZ-389: orthorectifier hook. Same semantics as the iSAM2
# estimator — wired by the composition root only when AZ-389
# is enabled in config + the runtime root has a TileStore.
# ``_latest_vio`` is the source of inlier_count / mre_px /
# imu_bias_norm metadata for ``try_emit_candidate``; ESKF
# historically did NOT cache the full DTO so we keep a
# dedicated field rather than abuse ``_prev_vio_*``.
self._orthorectifier: Orthorectifier | None = None
self._latest_nav_frame: "NavCameraFrame | None" = None
self._latest_vio: "VioOutput | None" = None
self._log.debug(
"c5.state.eskf_initialised",
extra={
@@ -269,6 +286,25 @@ class EskfStateEstimator(StateEstimator):
def attach_source_label_state_machine(self, machine: Any) -> None:
self._source_label_machine = machine
def attach_orthorectifier(self, orthorectifier: Orthorectifier) -> None:
"""AZ-389 — wire the mid-flight tile orthorectifier.
Optional; called by the composition root only when AZ-389 is
enabled in config. See
:meth:`GtsamIsam2StateEstimator.attach_orthorectifier` for the
cross-estimator contract.
"""
self._orthorectifier = orthorectifier
def set_latest_nav_frame(self, frame: "NavCameraFrame") -> None:
"""AZ-389 — buffer the most recent nav-camera frame.
Quiet no-op when no orthorectifier is attached. See
:meth:`GtsamIsam2StateEstimator.set_latest_nav_frame` for the
cross-estimator contract.
"""
self._latest_nav_frame = frame
def notify_gps_health(self, gps_health: GpsHealth, now_ns: int | None = None) -> None:
machine = self._source_label_machine
if isinstance(machine, SourceLabelStateMachine):
@@ -455,6 +491,8 @@ class EskfStateEstimator(StateEstimator):
self._close_cold_start_window()
ts_ns = vio.emitted_at_ns
self._guard_timestamp(ts_ns, source="vio")
# AZ-389: cache full DTO for the orthorectifier metadata path.
self._latest_vio = vio
curr_pose = vio.relative_pose_T.matrix()
if self._prev_vio_pose is None:
@@ -656,8 +694,50 @@ class EskfStateEstimator(StateEstimator):
)
self._history.append(out)
self._fallback.mark_successful_estimate(emitted_at)
# AZ-389: opportunistic mid-flight tile emission.
self._maybe_emit_mid_flight_tile(
output=out,
covariance=cov6,
last_anchor_age_ms=last_anchor_age_ms,
)
return out
def _maybe_emit_mid_flight_tile(
self,
*,
output: EstimatorOutput,
covariance: np.ndarray,
last_anchor_age_ms: int,
) -> None:
"""AZ-389 hook — invoke the orthorectifier when wired."""
orthorectifier = self._orthorectifier
frame = self._latest_nav_frame
if orthorectifier is None or frame is None:
return
latest_vio = self._latest_vio
if latest_vio is None:
inlier_count = 0
mre_px = float("inf")
imu_bias_norm = 0.0
else:
quality = latest_vio.feature_quality
inlier_count = int(quality.tracked)
mre_px = float(quality.mre_px)
imu_bias_norm = _imu_bias_norm(latest_vio.imu_bias)
orthorectifier.try_emit_candidate(
frame=frame,
pose_estimate=output,
cov_6x6=covariance,
inlier_count=inlier_count,
mre_px=mre_px,
source_label=output.source_label,
last_anchor_age_ms=int(last_anchor_age_ms),
imu_bias_norm=imu_bias_norm,
estimator_label=_STRATEGY,
)
def smoothed_history(self, n_keyframes: int) -> list[EstimatorOutput]:
"""Return up to ``n_keyframes`` recent forward-time entries.
@@ -910,11 +990,22 @@ def create(
se3_utils: Any,
wgs_converter: Any,
fdr_client: Any,
mid_flight_tile_writer: MidFlightTileWriter | None = None,
camera_calibration: "CameraCalibration | None" = None,
flight_id: str | None = None,
companion_id: str | None = None,
) -> tuple[StateEstimator, Any]:
"""Composition-root factory for the ESKF estimator.
Returns ``(estimator, handle)`` to match the iSAM2 factory shape;
the ESKF returns ``handle=None`` because it has no GTSAM graph.
AZ-389 wiring (optional): same contract as the iSAM2 factory —
when ``mid_flight_tile_writer`` is supplied AND
``config.components['c5_state'].orthorectifier.enabled`` is true,
the orthorectifier is constructed and attached. Missing
``camera_calibration`` / ``flight_id`` / ``companion_id`` while
enabled raises :class:`StateEstimatorConfigError`.
"""
estimator = EskfStateEstimator(
config,
@@ -923,6 +1014,31 @@ def create(
wgs_converter=wgs_converter,
fdr_client=fdr_client,
)
block: C5StateConfig = estimator._block
if mid_flight_tile_writer is not None and block.orthorectifier.enabled:
if camera_calibration is None or flight_id is None or companion_id is None:
raise StateEstimatorConfigError(
"AZ-389 orthorectifier requires camera_calibration + "
"flight_id + companion_id alongside mid_flight_tile_writer"
)
thresholds = OrthorectifierThresholds(
cov_norm_threshold=float(block.orthorectifier.cov_norm_threshold),
inlier_floor=int(block.orthorectifier.inlier_floor),
tile_size_meters=float(block.orthorectifier.tile_size_meters),
tile_size_pixels=int(block.orthorectifier.tile_size_pixels),
zoom_level=int(block.orthorectifier.zoom_level),
jpeg_quality=int(block.orthorectifier.jpeg_quality),
)
orthorectifier = Orthorectifier(
tile_writer=mid_flight_tile_writer,
thresholds=thresholds,
camera_calibration=camera_calibration,
flight_id=flight_id,
companion_id=companion_id,
)
estimator.attach_orthorectifier(orthorectifier)
return estimator, None
@@ -937,6 +1053,19 @@ def register() -> None:
# Module-level math + DTO helpers (kept local; small and self-contained).
def _imu_bias_norm(bias: Any) -> float:
"""Return ``sqrt(||accel_bias||^2 + ||gyro_bias||^2)`` for a :class:`ImuBias`.
Mirrors the iSAM2 helper; AZ-389 quality-metadata path. Robust to
absent / ``None`` components (legacy fixtures) by treating them
as zero.
"""
accel = getattr(bias, "accel_bias", (0.0, 0.0, 0.0)) or (0.0, 0.0, 0.0)
gyro = getattr(bias, "gyro_bias", (0.0, 0.0, 0.0)) or (0.0, 0.0, 0.0)
sq = sum(float(c) ** 2 for c in accel) + sum(float(c) ** 2 for c in gyro)
return sq**0.5
def _datetime_to_ns(dt: Any) -> int:
return int(dt.timestamp() * 1_000_000_000)
@@ -65,6 +65,11 @@ from gps_denied_onboard.components.c5_state._source_label_sm import (
RejectionSubscription,
SourceLabelStateMachine,
)
from gps_denied_onboard.components.c5_state._orthorectifier import (
MidFlightTileWriter,
Orthorectifier,
OrthorectifierThresholds,
)
from gps_denied_onboard.components.c5_state.config import C5StateConfig
from gps_denied_onboard.components.c5_state.errors import (
EstimatorAlreadyStartedError,
@@ -78,8 +83,9 @@ from gps_denied_onboard.helpers.wgs_converter import WgsConverter
from gps_denied_onboard.logging import get_logger
if TYPE_CHECKING:
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.fc import GpsHealth, GpsSample
from gps_denied_onboard._types.nav import ImuWindow, VioOutput
from gps_denied_onboard._types.nav import ImuWindow, NavCameraFrame, VioOutput
from gps_denied_onboard._types.pose import PoseEstimate
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.config import Config
@@ -281,6 +287,17 @@ class GtsamIsam2StateEstimator(StateEstimator):
# ``EstimatorAlreadyStartedError`` (AC-6).
self._cold_start_window_closed: bool = False
# AZ-389 state ----------------------------------------------------
# The orthorectifier is wired in the composition root via
# :meth:`attach_orthorectifier` whenever AZ-389 is enabled in
# config. ``None`` keeps the legacy steady-state path untouched
# (no mid-flight tile gen). ``_latest_nav_frame`` is the
# per-frame buffer the orthorectifier reads on a successful
# ``current_estimate`` (Invariant 1 — single ingest thread, so
# no locking needed).
self._orthorectifier: Orthorectifier | None = None
self._latest_nav_frame: "NavCameraFrame | None" = None
self._log.debug(
"c5.state.isam2_initialised",
extra={
@@ -315,6 +332,30 @@ class GtsamIsam2StateEstimator(StateEstimator):
"""
self._isam2_handle = handle
def attach_orthorectifier(self, orthorectifier: Orthorectifier) -> None:
"""Wire the AZ-389 mid-flight tile orthorectifier.
Optional; called by the composition root only when
``config.components['c5_state'].orthorectifier.enabled`` is
true. Once attached, every successful :meth:`current_estimate`
will invoke
:meth:`Orthorectifier.try_emit_candidate` with the most recent
nav-camera frame buffered via :meth:`set_latest_nav_frame`.
"""
self._orthorectifier = orthorectifier
def set_latest_nav_frame(self, frame: "NavCameraFrame") -> None:
"""Buffer the most recent nav-camera frame for AZ-389.
The composition root's per-frame loop calls this BEFORE
:meth:`current_estimate` so the orthorectifier sees a
consistent ``(frame, pose)`` pair. Single-threaded contract
(Invariant 1) — no locking. Quiet no-op when no orthorectifier
is attached so callers don't need to inspect the estimator
state before pushing frames.
"""
self._latest_nav_frame = frame
def set_enu_origin(self, origin: LatLonAlt) -> None:
"""Set the WGS84 origin used by :meth:`current_estimate` for ENU→WGS84.
@@ -1004,7 +1045,7 @@ class GtsamIsam2StateEstimator(StateEstimator):
# already been computed).
self._emit_smoothed_to_fdr_if_any(emitted_at, source_label, last_anchor_age_ms)
return EstimatorOutput(
output = EstimatorOutput(
frame_id=uuid4(),
position_wgs84=position_wgs84,
orientation_world_T_body=orientation,
@@ -1016,6 +1057,54 @@ class GtsamIsam2StateEstimator(StateEstimator):
emitted_at=emitted_at,
)
# AZ-389: opportunistic mid-flight tile emission. Best-effort —
# the orthorectifier internally swallows any kernel / writer
# failure; only the steady-state ``output`` is on the critical
# path. The hook fires on every successful ``current_estimate``
# call but the orthorectifier rate-limits itself to one tile per
# ``frame.frame_id`` (AC-4).
self._maybe_emit_mid_flight_tile(
output=output,
covariance=covariance,
last_anchor_age_ms=last_anchor_age_ms,
)
return output
def _maybe_emit_mid_flight_tile(
self,
*,
output: EstimatorOutput,
covariance: np.ndarray,
last_anchor_age_ms: int,
) -> None:
"""AZ-389 hook — invoke the orthorectifier when wired."""
orthorectifier = self._orthorectifier
frame = self._latest_nav_frame
if orthorectifier is None or frame is None:
return
prev_vio = self._prev_vio
if prev_vio is None:
inlier_count = 0
mre_px = float("inf")
imu_bias_norm = 0.0
else:
quality = prev_vio.feature_quality
inlier_count = int(quality.tracked)
mre_px = float(quality.mre_px)
imu_bias_norm = _imu_bias_norm(prev_vio.imu_bias)
orthorectifier.try_emit_candidate(
frame=frame,
pose_estimate=output,
cov_6x6=covariance,
inlier_count=inlier_count,
mre_px=mre_px,
source_label=output.source_label,
last_anchor_age_ms=int(last_anchor_age_ms),
imu_bias_norm=imu_bias_norm,
estimator_label=_STRATEGY,
)
def smoothed_history(self, n_keyframes: int) -> list[EstimatorOutput]:
"""Return up to ``min(n_keyframes, K)`` past smoothed estimates.
@@ -1475,6 +1564,10 @@ def create(
se3_utils: Any,
wgs_converter: Any,
fdr_client: Any,
mid_flight_tile_writer: MidFlightTileWriter | None = None,
camera_calibration: "CameraCalibration | None" = None,
flight_id: str | None = None,
companion_id: str | None = None,
) -> tuple[StateEstimator, ISam2GraphHandle]:
"""Composition-root factory — returns ``(estimator, handle)`` tuple.
@@ -1482,6 +1575,14 @@ def create(
handle is wired against it (the handle holds the estimator
reference for graph access), and the estimator is back-wired with
the handle so its ``add_*`` methods can call into it.
AZ-389 wiring (optional): if ``mid_flight_tile_writer`` is
provided AND ``config.components['c5_state'].orthorectifier.enabled``
is true, an :class:`Orthorectifier` is constructed and attached to
the estimator. ``camera_calibration``, ``flight_id``, and
``companion_id`` MUST be supplied alongside the writer; missing
any of them while orthorectifier is enabled raises
:class:`StateEstimatorConfigError`.
"""
estimator = GtsamIsam2StateEstimator(
config,
@@ -1492,6 +1593,31 @@ def create(
)
handle = ISam2GraphHandleImpl(estimator)
estimator.attach_handle(handle)
block: C5StateConfig = estimator._block
if mid_flight_tile_writer is not None and block.orthorectifier.enabled:
if camera_calibration is None or flight_id is None or companion_id is None:
raise StateEstimatorConfigError(
"AZ-389 orthorectifier requires camera_calibration + "
"flight_id + companion_id alongside mid_flight_tile_writer"
)
thresholds = OrthorectifierThresholds(
cov_norm_threshold=float(block.orthorectifier.cov_norm_threshold),
inlier_floor=int(block.orthorectifier.inlier_floor),
tile_size_meters=float(block.orthorectifier.tile_size_meters),
tile_size_pixels=int(block.orthorectifier.tile_size_pixels),
zoom_level=int(block.orthorectifier.zoom_level),
jpeg_quality=int(block.orthorectifier.jpeg_quality),
)
orthorectifier = Orthorectifier(
tile_writer=mid_flight_tile_writer,
thresholds=thresholds,
camera_calibration=camera_calibration,
flight_id=flight_id,
companion_id=companion_id,
)
estimator.attach_orthorectifier(orthorectifier)
return estimator, handle
@@ -1581,6 +1707,19 @@ def _make_timestamp_map(
# Module-level pure helpers (AZ-384).
def _imu_bias_norm(bias: Any) -> float:
"""Return ``sqrt(||accel_bias||^2 + ||gyro_bias||^2)`` for a :class:`ImuBias`.
Used by :meth:`GtsamIsam2StateEstimator._maybe_emit_mid_flight_tile`
to populate the AZ-389 quality metadata. Robust to ``None``
components (legacy fixtures) by treating them as zero.
"""
accel = getattr(bias, "accel_bias", (0.0, 0.0, 0.0)) or (0.0, 0.0, 0.0)
gyro = getattr(bias, "gyro_bias", (0.0, 0.0, 0.0)) or (0.0, 0.0, 0.0)
sq = sum(float(c) ** 2 for c in accel) + sum(float(c) ** 2 for c in gyro)
return sq**0.5
def _quat_from_pose3(pose: gtsam.Pose3) -> Quat:
"""Build a scalar-first :class:`Quat` from a ``gtsam.Pose3``.
@@ -21,6 +21,7 @@ binary may set ``OFF`` and only link the ESKF baseline.
from __future__ import annotations
import hashlib
import os
import threading
from collections.abc import Callable
@@ -37,7 +38,9 @@ from gps_denied_onboard.config import Config
from gps_denied_onboard.logging import get_logger
if TYPE_CHECKING:
pass
from datetime import datetime
from gps_denied_onboard._types.calibration import CameraCalibration
__all__ = [
"StateEstimatorFactory",
@@ -136,6 +139,10 @@ def build_state_estimator(
se3_utils: Any,
wgs_converter: Any,
fdr_client: Any,
tile_store: Any | None = None,
camera_calibration: "CameraCalibration | None" = None,
flight_id: str | None = None,
companion_id: str | None = None,
) -> tuple[StateEstimator, ISam2GraphHandle]:
"""Resolve + build the configured state estimator.
@@ -147,6 +154,16 @@ def build_state_estimator(
lookup. The first failure surfaces a :class:`StateEstimatorConfigError`
with the offending strategy + flag name so the operator gets a
clear next step.
AZ-389 wiring (optional): when ``tile_store`` is provided AND
``config.components['c5_state'].orthorectifier.enabled`` is true,
a :class:`_C6MidFlightIngestAdapter` wraps the C6 surface
(hiding ``TileMetadata`` / ``TileSource`` /
``FreshnessRejectionError`` so c5_state stays free of
cross-component imports per AZ-507) and is forwarded to the
estimator factory together with ``camera_calibration``,
``flight_id``, and ``companion_id``. The factory then constructs
the orthorectifier and attaches it to the estimator.
"""
block = _read_state_block(config)
strategy = block.strategy
@@ -169,16 +186,37 @@ def build_state_estimator(
f"state strategy {strategy!r} selected by config but not registered; "
f"registered strategies: {list_registered_state_strategies()}"
)
estimator, handle = factory(
config=config,
imu_preintegrator=imu_preintegrator,
se3_utils=se3_utils,
wgs_converter=wgs_converter,
fdr_client=fdr_client,
)
factory_kwargs: dict[str, Any] = {
"config": config,
"imu_preintegrator": imu_preintegrator,
"se3_utils": se3_utils,
"wgs_converter": wgs_converter,
"fdr_client": fdr_client,
}
if tile_store is not None and block.orthorectifier.enabled:
if camera_calibration is None or flight_id is None or companion_id is None:
raise StateEstimatorConfigError(
"AZ-389 orthorectifier enabled — tile_store must be paired with "
"camera_calibration + flight_id + companion_id "
"(missing: "
f"camera_calibration={camera_calibration is None}, "
f"flight_id={flight_id is None}, "
f"companion_id={companion_id is None})"
)
factory_kwargs["mid_flight_tile_writer"] = _C6MidFlightIngestAdapter(
tile_store=tile_store
)
factory_kwargs["camera_calibration"] = camera_calibration
factory_kwargs["flight_id"] = flight_id
factory_kwargs["companion_id"] = companion_id
estimator, handle = factory(**factory_kwargs)
_log_strategy_loaded(
strategy=strategy,
keyframe_window_size=block.keyframe_window_size,
orthorectifier_enabled=block.orthorectifier.enabled
and tile_store is not None,
)
return estimator, handle
@@ -199,16 +237,124 @@ def _read_state_block(config: Config) -> C5StateConfig:
)
def _log_strategy_loaded(*, strategy: str, keyframe_window_size: int) -> None:
def _log_strategy_loaded(
*,
strategy: str,
keyframe_window_size: int,
orthorectifier_enabled: bool,
) -> None:
log = get_logger("runtime_root.state_factory")
log.info(
f"c5.state.strategy_loaded: strategy={strategy} "
f"keyframe_window_size={keyframe_window_size}",
f"keyframe_window_size={keyframe_window_size} "
f"orthorectifier_enabled={orthorectifier_enabled}",
extra={
"kind": "c5.state.strategy_loaded",
"kv": {
"strategy": strategy,
"keyframe_window_size": keyframe_window_size,
"orthorectifier_enabled": orthorectifier_enabled,
},
},
)
# ----------------------------------------------------------------------
# AZ-389: composition-root bridge between c5_state and c6_tile_cache.
class _C6MidFlightIngestAdapter:
"""C5 ↔ C6 adapter for AZ-389 mid-flight tile candidates.
The c5_state component cannot import from c6_tile_cache (AZ-507;
``test_az270_compose_root.test_ac6_only_compose_root_imports_concrete_strategies``).
This adapter is the single allowed bridge: it lives in the
composition root, accepts the c6 :class:`TileStore` reference,
and exposes the
:class:`gps_denied_onboard.components.c5_state._orthorectifier.MidFlightTileWriter`
Protocol surface that c5 depends on.
Translation rules:
* ``write_mid_flight_tile`` builds the canonical
:class:`TileMetadata` (``TileSource.ONBOARD_INGEST`` +
``FreshnessLabel.FRESH`` + ``VotingStatus.PENDING``) and a
:class:`TileQualityMetadata` from the C5 inputs, hashes the
JPEG bytes, and calls :meth:`TileStore.write_tile` once
(atomic file + metadata insert).
* A :class:`FreshnessRejectionError` from C6 is translated to a
``None`` return — the orthorectifier swallows that silently
per AC-NEW-3.
* Any other C6 error propagates; the orthorectifier itself
wraps that in a WARNING log and returns ``None`` so the C5
``current_estimate`` path is never broken by tile-cache
infrastructure failures.
"""
__slots__ = ("_tile_store",)
def __init__(self, *, tile_store: Any) -> None:
self._tile_store = tile_store
def write_mid_flight_tile(
self,
*,
jpeg_bytes: bytes,
zoom_level: int,
lat: float,
lon: float,
tile_size_meters: float,
tile_size_pixels: int,
capture_timestamp: "datetime",
flight_id: str,
companion_id: str,
estimator_label: str,
covariance_2x2_horizontal: tuple[
tuple[float, float], tuple[float, float]
],
last_anchor_age_ms: int,
mre_px: float,
imu_bias_norm: float,
) -> tuple[int, float, float] | None:
from gps_denied_onboard.components.c6_tile_cache._types import (
FreshnessLabel,
TileId,
TileMetadata,
TileQualityMetadata,
TileSource,
VotingStatus,
)
from gps_denied_onboard.components.c6_tile_cache.errors import (
FreshnessRejectionError,
)
tile_id = TileId(
zoom_level=int(zoom_level),
lat=float(lat),
lon=float(lon),
)
quality = TileQualityMetadata(
estimator_label=str(estimator_label),
covariance_2x2=covariance_2x2_horizontal,
last_anchor_age_ms=int(last_anchor_age_ms),
mre_px=float(mre_px),
imu_bias_norm=float(imu_bias_norm),
)
metadata = TileMetadata(
tile_id=tile_id,
tile_size_meters=float(tile_size_meters),
tile_size_pixels=int(tile_size_pixels),
capture_timestamp=capture_timestamp,
source=TileSource.ONBOARD_INGEST,
content_sha256_hex=hashlib.sha256(jpeg_bytes).hexdigest(),
freshness_label=FreshnessLabel.FRESH,
flight_id=str(flight_id),
companion_id=str(companion_id),
quality_metadata=quality,
voting_status=VotingStatus.PENDING,
)
try:
self._tile_store.write_tile(jpeg_bytes, metadata)
except FreshnessRejectionError:
return None
return (tile_id.zoom_level, tile_id.lat, tile_id.lon)
@@ -0,0 +1,715 @@
"""AZ-389 — C5 Orthorectifier → C6 mid-flight tile candidate emission.
Covers AC-1..AC-9 from
``_docs/02_tasks/todo/AZ-389_c5_orthorectifier_c6.md``.
Tests instantiate :class:`Orthorectifier` directly with a fake
:class:`MidFlightTileWriter`, then drive synthetic
``(NavCameraFrame, EstimatorOutput, cov_6x6, ...)`` tuples through
:meth:`Orthorectifier.try_emit_candidate`. The composition-root
adapter (``_C6MidFlightIngestAdapter``) is exercised separately
against fake C6 protocols.
Quality-gate / silent-return / freshness-rejection branches are
asserted via call-count checks against the fake writer; the
homography-geometry branch (AC-1) verifies that the ground-plane
homography projects a synthetic ground point to the expected image
pixel within 1 px (the "1-pixel tolerance" gate).
"""
from __future__ import annotations
import hashlib
import logging
from datetime import datetime, timezone
from typing import Any
from unittest import mock
from uuid import uuid4
import cv2
import numpy as np
import pytest
from gps_denied_onboard._types.calibration import CameraCalibration
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.nav import NavCameraFrame
from gps_denied_onboard._types.state import (
EstimatorOutput,
PoseSourceLabel,
Quat,
)
from gps_denied_onboard.components.c5_state._orthorectifier import (
MidFlightTileWriter,
Orthorectifier,
OrthorectifierThresholds,
_compose_tile_to_image_homography,
_ground_plane_homography,
_invert_se3,
_quat_to_rotation_matrix,
)
# ----------------------------------------------------------------------
# Fixtures + helpers.
_FLIGHT_ID = "flt-test-001"
_COMPANION_ID = "comp-test-001"
_ESTIMATOR_LABEL = "gtsam_isam2"
def _make_thresholds(
*,
cov_norm_threshold: float = 1.0,
inlier_floor: int = 30,
tile_size_meters: float = 100.0,
tile_size_pixels: int = 64,
zoom_level: int = 18,
jpeg_quality: int = 90,
) -> OrthorectifierThresholds:
return OrthorectifierThresholds(
cov_norm_threshold=cov_norm_threshold,
inlier_floor=inlier_floor,
tile_size_meters=tile_size_meters,
tile_size_pixels=tile_size_pixels,
zoom_level=zoom_level,
jpeg_quality=jpeg_quality,
)
def _nadir_calibration() -> CameraCalibration:
"""Nadir-looking camera, 500-pixel focal length, centred principal point."""
K = np.array(
[
[500.0, 0.0, 200.0],
[0.0, 500.0, 200.0],
[0.0, 0.0, 1.0],
],
dtype=np.float64,
)
# body_T_cam — camera +Z aligned with body +Z (i.e. body coordinate
# frame is camera coordinate frame). The orthorectifier will then
# use the pose's world_T_body directly as world_T_cam, which is the
# simplest geometric setup for a unit test.
body_T_cam = np.eye(4, dtype=np.float64)
return CameraCalibration(
camera_id="test_cam",
intrinsics_3x3=K,
distortion=np.zeros(5, dtype=np.float64),
body_to_camera_se3=body_T_cam,
acquisition_method="synthetic",
)
def _make_image(size: int = 400, dtype: Any = np.uint8) -> np.ndarray:
"""Synthetic 400x400 grayscale image — a single bright square at the centre."""
img = np.zeros((size, size), dtype=dtype)
cs = size // 2
img[cs - 20 : cs + 20, cs - 20 : cs + 20] = 255
return img
def _frame(frame_id: int = 1) -> NavCameraFrame:
return NavCameraFrame(
frame_id=frame_id,
timestamp=datetime(2026, 5, 16, 12, 0, 0, tzinfo=timezone.utc),
image=_make_image(),
camera_calibration_id="test_cam",
)
def _camera_looking_down_quat() -> Quat:
"""Quaternion for a body-frame camera whose +Z axis points along world -Z (nadir).
Built as a 180° rotation about world +X. This flips body +Z to
world -Z (so a point on the ground at world +Z = -altitude appears
in front of the camera) and body +Y to world -Y; the X axis is
preserved. The standard pinhole projection then maps ground (X, Y)
to pixel (cx + fx*X/h, cy - fy*Y/h) where ``h`` is altitude — the
expected nadir homography for unit tests.
"""
angle = np.pi
axis = np.array([1.0, 0.0, 0.0])
# q = (cos(theta/2), axis * sin(theta/2))
half = angle / 2.0
return Quat(
w=float(np.cos(half)),
x=float(axis[0] * np.sin(half)),
y=float(axis[1] * np.sin(half)),
z=float(axis[2] * np.sin(half)),
)
def _output(
*,
lat: float = 50.45,
lon: float = 30.52,
alt_m: float = 100.0,
cov: np.ndarray | None = None,
source_label: PoseSourceLabel = PoseSourceLabel.SATELLITE_ANCHORED,
quat: Quat | None = None,
) -> EstimatorOutput:
if cov is None:
cov = np.eye(6, dtype=np.float64) * 0.01
if quat is None:
quat = _camera_looking_down_quat()
return EstimatorOutput(
frame_id=uuid4(),
position_wgs84=LatLonAlt(lat_deg=lat, lon_deg=lon, alt_m=alt_m),
orientation_world_T_body=quat,
velocity_world_mps=(0.0, 0.0, 0.0),
covariance_6x6=cov,
source_label=source_label,
last_satellite_anchor_age_ms=50,
smoothed=False,
emitted_at=1_000_000_000,
)
class _RecordingWriter:
"""Fake :class:`MidFlightTileWriter` — records every call.
By default returns ``(zoom, lat, lon)`` from the supplied input
(i.e. simulates a successful insert). Configure ``return_value`` to
a literal value (or ``None`` to simulate the C6 freshness gate
rejecting the insert) to override per-test.
"""
def __init__(self, *, return_override: Any = ..., side_effect: Any = None) -> None:
self.calls: list[dict[str, Any]] = []
self._return_override = return_override
self._side_effect = side_effect
def write_mid_flight_tile(
self,
*,
jpeg_bytes: bytes,
zoom_level: int,
lat: float,
lon: float,
tile_size_meters: float,
tile_size_pixels: int,
capture_timestamp: datetime,
flight_id: str,
companion_id: str,
estimator_label: str,
covariance_2x2_horizontal: tuple[
tuple[float, float], tuple[float, float]
],
last_anchor_age_ms: int,
mre_px: float,
imu_bias_norm: float,
) -> tuple[int, float, float] | None:
kv = {
"jpeg_bytes": jpeg_bytes,
"zoom_level": zoom_level,
"lat": lat,
"lon": lon,
"tile_size_meters": tile_size_meters,
"tile_size_pixels": tile_size_pixels,
"capture_timestamp": capture_timestamp,
"flight_id": flight_id,
"companion_id": companion_id,
"estimator_label": estimator_label,
"covariance_2x2_horizontal": covariance_2x2_horizontal,
"last_anchor_age_ms": last_anchor_age_ms,
"mre_px": mre_px,
"imu_bias_norm": imu_bias_norm,
}
self.calls.append(kv)
if self._side_effect is not None:
raise self._side_effect
if self._return_override is ...:
return (zoom_level, lat, lon)
return self._return_override
def _make_orthorectifier(
*,
writer: MidFlightTileWriter | None = None,
thresholds: OrthorectifierThresholds | None = None,
calibration: CameraCalibration | None = None,
) -> tuple[Orthorectifier, _RecordingWriter]:
if writer is None:
writer = _RecordingWriter()
return (
Orthorectifier(
tile_writer=writer,
thresholds=thresholds or _make_thresholds(),
camera_calibration=calibration or _nadir_calibration(),
flight_id=_FLIGHT_ID,
companion_id=_COMPANION_ID,
),
writer, # type: ignore[return-value]
)
def _emit_kwargs(
*,
frame: NavCameraFrame | None = None,
output: EstimatorOutput | None = None,
cov_6x6: np.ndarray | None = None,
inlier_count: int = 60,
mre_px: float = 1.5,
source_label: PoseSourceLabel | None = None,
last_anchor_age_ms: int = 50,
imu_bias_norm: float = 0.01,
estimator_label: str = _ESTIMATOR_LABEL,
) -> dict[str, Any]:
out = output if output is not None else _output()
return {
"frame": frame if frame is not None else _frame(),
"pose_estimate": out,
"cov_6x6": cov_6x6 if cov_6x6 is not None else out.covariance_6x6,
"inlier_count": inlier_count,
"mre_px": mre_px,
"source_label": source_label or out.source_label,
"last_anchor_age_ms": last_anchor_age_ms,
"imu_bias_norm": imu_bias_norm,
"estimator_label": estimator_label,
}
# ----------------------------------------------------------------------
# AC-2 — covariance-norm gate.
def test_ac2_cov_norm_above_threshold_blocks_emission() -> None:
# Arrange
ortho, writer = _make_orthorectifier(
thresholds=_make_thresholds(cov_norm_threshold=1.0)
)
big_cov = np.eye(6, dtype=np.float64) * 10.0
out = _output(cov=big_cov)
# Act
result = ortho.try_emit_candidate(**_emit_kwargs(output=out, cov_6x6=big_cov))
# Assert
assert result is None
assert writer.calls == []
def test_ac2_cov_norm_below_threshold_emits() -> None:
# Arrange
ortho, writer = _make_orthorectifier(
thresholds=_make_thresholds(cov_norm_threshold=10.0)
)
# Act
result = ortho.try_emit_candidate(**_emit_kwargs())
# Assert
assert result is not None
assert len(writer.calls) == 1
# ----------------------------------------------------------------------
# AC-3 — source-label gate.
@pytest.mark.parametrize(
"label",
[PoseSourceLabel.VISUAL_PROPAGATED, PoseSourceLabel.DEAD_RECKONED],
)
def test_ac3_non_satellite_anchored_blocked(label: PoseSourceLabel) -> None:
# Arrange
ortho, writer = _make_orthorectifier()
out = _output(source_label=label)
# Act
result = ortho.try_emit_candidate(
**_emit_kwargs(output=out, source_label=label)
)
# Assert
assert result is None
assert writer.calls == []
# ----------------------------------------------------------------------
# AC-4 — once-per-frame rate limit.
def test_ac4_same_frame_id_processed_only_once() -> None:
# Arrange
ortho, writer = _make_orthorectifier()
frame = _frame(frame_id=42)
# Act
first = ortho.try_emit_candidate(**_emit_kwargs(frame=frame))
second = ortho.try_emit_candidate(**_emit_kwargs(frame=frame))
# Assert
assert first is not None
assert second is None
assert len(writer.calls) == 1
def test_ac4_distinct_frame_ids_each_emit() -> None:
# Arrange
ortho, writer = _make_orthorectifier()
# Act
ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=1)))
ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=2)))
# Assert
assert len(writer.calls) == 2
# ----------------------------------------------------------------------
# AC-8 — defensive missing-input skip.
@pytest.mark.parametrize(
"missing",
["frame", "pose_estimate", "cov_6x6"],
)
def test_ac8_missing_inputs_silent_return_none(missing: str) -> None:
# Arrange
ortho, writer = _make_orthorectifier()
kwargs = _emit_kwargs()
kwargs[missing] = None
# Act
result = ortho.try_emit_candidate(**kwargs)
# Assert
assert result is None
assert writer.calls == []
# ----------------------------------------------------------------------
# AC-5 / AC-6 — writer is called with the right metadata + SHA256.
def test_ac5_writer_called_with_onboard_ingest_metadata() -> None:
# Arrange
ortho, writer = _make_orthorectifier()
out = _output(lat=50.45, lon=30.52, alt_m=100.0)
# Act
ortho.try_emit_candidate(**_emit_kwargs(output=out))
# Assert — writer received the canonical onboard-ingest payload.
assert len(writer.calls) == 1
call = writer.calls[0]
assert call["zoom_level"] == 18
assert call["lat"] == pytest.approx(50.45)
assert call["lon"] == pytest.approx(30.52)
assert call["tile_size_meters"] == pytest.approx(100.0)
assert call["tile_size_pixels"] == 64
assert call["flight_id"] == _FLIGHT_ID
assert call["companion_id"] == _COMPANION_ID
assert call["estimator_label"] == _ESTIMATOR_LABEL
assert call["last_anchor_age_ms"] == 50
assert call["mre_px"] == pytest.approx(1.5)
assert call["imu_bias_norm"] == pytest.approx(0.01)
cov2x2 = call["covariance_2x2_horizontal"]
assert cov2x2[0][0] == pytest.approx(0.01)
assert cov2x2[1][1] == pytest.approx(0.01)
def test_ac6_jpeg_bytes_decode_to_expected_shape_and_quality() -> None:
# Arrange
ortho, writer = _make_orthorectifier(
thresholds=_make_thresholds(tile_size_pixels=128, jpeg_quality=80)
)
# Act
ortho.try_emit_candidate(**_emit_kwargs())
# Assert — emitted blob is a JPEG that decodes to the configured tile size.
assert len(writer.calls) == 1
jpeg_bytes = writer.calls[0]["jpeg_bytes"]
assert isinstance(jpeg_bytes, bytes)
assert len(jpeg_bytes) > 0
assert jpeg_bytes[:3] == b"\xff\xd8\xff" # JPEG SOI marker
decoded = cv2.imdecode(np.frombuffer(jpeg_bytes, dtype=np.uint8), cv2.IMREAD_UNCHANGED)
assert decoded is not None
assert decoded.shape[:2] == (128, 128)
# ----------------------------------------------------------------------
# AC-9 — writer-side freshness rejection swallowed.
def test_ac9_writer_returning_none_swallowed() -> None:
# Arrange
writer = _RecordingWriter(return_override=None)
ortho, _ = _make_orthorectifier(writer=writer)
# Act
result = ortho.try_emit_candidate(**_emit_kwargs())
# Assert
assert result is None
assert len(writer.calls) == 1 # the writer WAS called
def test_ac9_writer_raising_swallowed_with_warning(caplog: pytest.LogCaptureFixture) -> None:
# Arrange
writer = _RecordingWriter(side_effect=RuntimeError("infra blew up"))
ortho, _ = _make_orthorectifier(writer=writer)
# Act
with caplog.at_level(logging.WARNING, logger="gps_denied_onboard.c5_state.orthorectifier"):
result = ortho.try_emit_candidate(**_emit_kwargs())
# Assert
assert result is None
assert any(
rec.message == "c5.state.orthorectifier_writer_failed" for rec in caplog.records
)
# ----------------------------------------------------------------------
# AC-7 — first-emission INFO log.
def test_ac7_first_emission_logs_info_subsequent_logs_debug(
caplog: pytest.LogCaptureFixture,
) -> None:
# Arrange
ortho, _ = _make_orthorectifier()
# Act
with caplog.at_level(logging.INFO, logger="gps_denied_onboard.c5_state.orthorectifier"):
ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=1)))
ortho.try_emit_candidate(**_emit_kwargs(frame=_frame(frame_id=2)))
# Assert
info_records = [r for r in caplog.records if r.levelno == logging.INFO]
assert any(r.message == "c5.state.first_mid_flight_candidate" for r in info_records)
assert sum(r.message == "c5.state.first_mid_flight_candidate" for r in info_records) == 1
# ----------------------------------------------------------------------
# Inlier-floor gate (specified alongside AC-2 in the task body).
def test_inlier_floor_blocks_emission() -> None:
# Arrange
ortho, writer = _make_orthorectifier(
thresholds=_make_thresholds(inlier_floor=50)
)
# Act — inlier_count == floor is rejected (strict inequality).
result = ortho.try_emit_candidate(**_emit_kwargs(inlier_count=50))
# Assert
assert result is None
assert writer.calls == []
# ----------------------------------------------------------------------
# AC-1 — orthorectification correctness (geometry).
def test_ac1_homography_projects_origin_to_principal_point() -> None:
"""Camera straight down at altitude h, K = [fx 0 cx; 0 fy cy; 0 0 1].
Ground point (X=0, Y=0) lies under the camera. With the body-
looking-down convention (180° about world +X), the
``world_T_cam = world_T_body`` for ``body_T_cam = I``:
* Camera position in world = (0, 0, h)
* Camera Z axis points along world -Z (hits ground at Z=0)
The expected projection of ground (0, 0, 0) is the principal
point ``(cx, cy)`` to within sub-pixel tolerance.
"""
# Arrange
cal = _nadir_calibration()
K = np.asarray(cal.intrinsics_3x3, dtype=np.float64)
body_T_cam = np.asarray(cal.body_to_camera_se3, dtype=np.float64)
out = _output(alt_m=100.0)
R = _quat_to_rotation_matrix(out.orientation_world_T_body)
world_T_body = np.eye(4, dtype=np.float64)
world_T_body[:3, :3] = R
world_T_body[2, 3] = float(out.position_wgs84.alt_m)
world_T_cam = world_T_body @ body_T_cam
cam_T_world = _invert_se3(world_T_cam)
H = _ground_plane_homography(K=K, cam_T_world=cam_T_world)
# Act — project ground origin (X=0, Y=0, 1).
homog = H @ np.array([0.0, 0.0, 1.0])
pix = homog[:2] / homog[2]
# Assert
assert pix == pytest.approx(np.array([200.0, 200.0]), abs=1.0)
def test_ac1_homography_projects_offset_within_one_pixel() -> None:
"""Ground (X=10 m, Y=0) at altitude 100 m with fx=500 should land at cx + 50."""
# Arrange
cal = _nadir_calibration()
K = np.asarray(cal.intrinsics_3x3, dtype=np.float64)
body_T_cam = np.asarray(cal.body_to_camera_se3, dtype=np.float64)
out = _output(alt_m=100.0)
R = _quat_to_rotation_matrix(out.orientation_world_T_body)
world_T_body = np.eye(4, dtype=np.float64)
world_T_body[:3, :3] = R
world_T_body[2, 3] = float(out.position_wgs84.alt_m)
world_T_cam = world_T_body @ body_T_cam
cam_T_world = _invert_se3(world_T_cam)
H = _ground_plane_homography(K=K, cam_T_world=cam_T_world)
# Act
homog = H @ np.array([10.0, 0.0, 1.0])
pix = homog[:2] / homog[2]
# Assert — fx=500, h=100 → 10 m on the ground projects to 50 px from cx.
# The 180° flip about world +X sends body +Y to world -Y so a +X ground
# offset stays at +X in image space (cx + 50) on the row of the
# principal point.
assert pix[0] == pytest.approx(250.0, abs=1.0)
assert pix[1] == pytest.approx(200.0, abs=1.0)
def test_compose_tile_to_image_homography_is_centred_on_camera() -> None:
"""Tile pixel ``(N/2, N/2)`` corresponds to the camera nadir on the ground."""
# Arrange
cal = _nadir_calibration()
K = np.asarray(cal.intrinsics_3x3, dtype=np.float64)
body_T_cam = np.asarray(cal.body_to_camera_se3, dtype=np.float64)
out = _output(alt_m=100.0)
R = _quat_to_rotation_matrix(out.orientation_world_T_body)
world_T_body = np.eye(4, dtype=np.float64)
world_T_body[:3, :3] = R
world_T_body[2, 3] = float(out.position_wgs84.alt_m)
world_T_cam = world_T_body @ body_T_cam
cam_T_world = _invert_se3(world_T_cam)
H_g2i = _ground_plane_homography(K=K, cam_T_world=cam_T_world)
N = 64
H_t2i = _compose_tile_to_image_homography(
H_ground_to_image=H_g2i,
ground_x_center=float(world_T_cam[0, 3]),
ground_y_center=float(world_T_cam[1, 3]),
tile_size_meters=100.0,
tile_size_pixels=N,
)
# Act — middle of the tile maps to the principal point of the camera image.
homog = H_t2i @ np.array([N / 2.0, N / 2.0, 1.0])
pix = homog[:2] / homog[2]
# Assert
assert pix == pytest.approx(np.array([200.0, 200.0]), abs=1.0)
# ----------------------------------------------------------------------
# Composition-root adapter — _C6MidFlightIngestAdapter.
def test_adapter_translates_freshness_rejection_to_none() -> None:
"""The adapter catches :class:`FreshnessRejectionError` and returns ``None`` so the orthorectifier silently drops the candidate."""
# Arrange
from gps_denied_onboard.components.c6_tile_cache.errors import (
FreshnessRejectionError,
)
from gps_denied_onboard.runtime_root.state_factory import (
_C6MidFlightIngestAdapter,
)
fake_store = mock.MagicMock()
fake_store.write_tile.side_effect = FreshnessRejectionError(
"rear_sector stale"
)
adapter = _C6MidFlightIngestAdapter(tile_store=fake_store)
# Act
result = adapter.write_mid_flight_tile(
jpeg_bytes=b"\xff\xd8\xff\xe0fakejpeg",
zoom_level=18,
lat=50.45,
lon=30.52,
tile_size_meters=100.0,
tile_size_pixels=64,
capture_timestamp=datetime(2026, 5, 16, 12, 0, 0, tzinfo=timezone.utc),
flight_id=_FLIGHT_ID,
companion_id=_COMPANION_ID,
estimator_label=_ESTIMATOR_LABEL,
covariance_2x2_horizontal=((0.01, 0.0), (0.0, 0.01)),
last_anchor_age_ms=50,
mre_px=1.5,
imu_bias_norm=0.01,
)
# Assert
assert result is None
fake_store.write_tile.assert_called_once()
def test_adapter_calls_write_tile_with_onboard_ingest_metadata() -> None:
"""On a successful insert the adapter builds an ``ONBOARD_INGEST`` row, sets ``PENDING`` voting status, and computes the SHA256 of the JPEG."""
# Arrange
from gps_denied_onboard.components.c6_tile_cache._types import (
FreshnessLabel,
TileSource,
VotingStatus,
)
from gps_denied_onboard.runtime_root.state_factory import (
_C6MidFlightIngestAdapter,
)
fake_store = mock.MagicMock()
adapter = _C6MidFlightIngestAdapter(tile_store=fake_store)
jpeg_bytes = b"\xff\xd8\xff\xe0testjpegblob"
# Act
result = adapter.write_mid_flight_tile(
jpeg_bytes=jpeg_bytes,
zoom_level=18,
lat=50.45,
lon=30.52,
tile_size_meters=100.0,
tile_size_pixels=64,
capture_timestamp=datetime(2026, 5, 16, 12, 0, 0, tzinfo=timezone.utc),
flight_id=_FLIGHT_ID,
companion_id=_COMPANION_ID,
estimator_label=_ESTIMATOR_LABEL,
covariance_2x2_horizontal=((0.01, 0.0), (0.0, 0.01)),
last_anchor_age_ms=50,
mre_px=1.5,
imu_bias_norm=0.01,
)
# Assert — write_tile invoked with (blob, metadata).
fake_store.write_tile.assert_called_once()
args, kwargs = fake_store.write_tile.call_args
assert kwargs == {}
assert args[0] == jpeg_bytes
metadata = args[1]
assert metadata.source == TileSource.ONBOARD_INGEST
assert metadata.freshness_label == FreshnessLabel.FRESH
assert metadata.voting_status == VotingStatus.PENDING
assert metadata.flight_id == _FLIGHT_ID
assert metadata.companion_id == _COMPANION_ID
assert metadata.tile_id.zoom_level == 18
assert metadata.tile_id.lat == pytest.approx(50.45)
assert metadata.tile_id.lon == pytest.approx(30.52)
assert metadata.content_sha256_hex == hashlib.sha256(jpeg_bytes).hexdigest()
quality = metadata.quality_metadata
assert quality is not None
assert quality.estimator_label == _ESTIMATOR_LABEL
assert quality.covariance_2x2 == ((0.01, 0.0), (0.0, 0.01))
assert quality.last_anchor_age_ms == 50
assert result == (18, pytest.approx(50.45), pytest.approx(30.52))
# ----------------------------------------------------------------------
# OrthorectifierConfig validation (config block sanity).
def test_thresholds_reject_non_positive_cov_norm() -> None:
# Act / Assert
with pytest.raises(ValueError):
OrthorectifierThresholds(cov_norm_threshold=0.0, inlier_floor=10)
def test_thresholds_reject_negative_inlier_floor() -> None:
# Act / Assert
with pytest.raises(ValueError):
OrthorectifierThresholds(cov_norm_threshold=1.0, inlier_floor=-1)