From e86084da6b1e21add727cfb241ac26c8603de906 Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Sun, 3 May 2026 18:01:13 +0300 Subject: [PATCH] [AZ-223] [AZ-224] [AZ-225] [AZ-227] Add runtime gateways Implement the first runtime component boundaries around the shared contracts so downstream batches can consume typed frame, MAVLink, tile, and FDR behavior with focused tests and batch evidence. Co-authored-by: Cursor --- .../AZ-223_camera_ingest_calibration.md | 0 .../AZ-224_mavlink_gcs_gateway.md | 0 .../AZ-225_tile_manager_cache_manifest.md | 0 .../AZ-227_fdr_event_recorder.md | 0 .../batch_04_cycle1_report.md | 47 +++++++ .../reviews/batch_04_review.md | 29 ++++ _docs/_autodev_state.md | 2 +- src/camera_ingest_calibration/__init__.py | 21 +++ src/camera_ingest_calibration/interfaces.py | 86 +++++++++++ src/camera_ingest_calibration/types.py | 71 +++++++++- src/fdr_observability/__init__.py | 21 +++ src/fdr_observability/interfaces.py | 108 ++++++++++++++ src/fdr_observability/types.py | 53 ++++++- src/mavlink_gcs_integration/__init__.py | 23 +++ src/mavlink_gcs_integration/interfaces.py | 73 ++++++++++ src/mavlink_gcs_integration/types.py | 81 ++++++++++- src/tile_manager/__init__.py | 19 +++ src/tile_manager/interfaces.py | 133 ++++++++++++++++++ src/tile_manager/types.py | 62 +++++++- tests/unit/test_camera_ingest_calibration.py | 76 ++++++++++ tests/unit/test_fdr_observability.py | 64 +++++++++ tests/unit/test_mavlink_gcs_integration.py | 72 ++++++++++ tests/unit/test_tile_manager.py | 78 ++++++++++ 23 files changed, 1106 insertions(+), 13 deletions(-) rename _docs/02_tasks/{todo => done}/AZ-223_camera_ingest_calibration.md (100%) rename _docs/02_tasks/{todo => done}/AZ-224_mavlink_gcs_gateway.md (100%) rename _docs/02_tasks/{todo => done}/AZ-225_tile_manager_cache_manifest.md (100%) rename _docs/02_tasks/{todo => done}/AZ-227_fdr_event_recorder.md (100%) create mode 100644 _docs/03_implementation/batch_04_cycle1_report.md create mode 100644 _docs/03_implementation/reviews/batch_04_review.md create mode 100644 tests/unit/test_camera_ingest_calibration.py create mode 100644 tests/unit/test_fdr_observability.py create mode 100644 tests/unit/test_mavlink_gcs_integration.py create mode 100644 tests/unit/test_tile_manager.py diff --git a/_docs/02_tasks/todo/AZ-223_camera_ingest_calibration.md b/_docs/02_tasks/done/AZ-223_camera_ingest_calibration.md similarity index 100% rename from _docs/02_tasks/todo/AZ-223_camera_ingest_calibration.md rename to _docs/02_tasks/done/AZ-223_camera_ingest_calibration.md diff --git a/_docs/02_tasks/todo/AZ-224_mavlink_gcs_gateway.md b/_docs/02_tasks/done/AZ-224_mavlink_gcs_gateway.md similarity index 100% rename from _docs/02_tasks/todo/AZ-224_mavlink_gcs_gateway.md rename to _docs/02_tasks/done/AZ-224_mavlink_gcs_gateway.md diff --git a/_docs/02_tasks/todo/AZ-225_tile_manager_cache_manifest.md b/_docs/02_tasks/done/AZ-225_tile_manager_cache_manifest.md similarity index 100% rename from _docs/02_tasks/todo/AZ-225_tile_manager_cache_manifest.md rename to _docs/02_tasks/done/AZ-225_tile_manager_cache_manifest.md diff --git a/_docs/02_tasks/todo/AZ-227_fdr_event_recorder.md b/_docs/02_tasks/done/AZ-227_fdr_event_recorder.md similarity index 100% rename from _docs/02_tasks/todo/AZ-227_fdr_event_recorder.md rename to _docs/02_tasks/done/AZ-227_fdr_event_recorder.md diff --git a/_docs/03_implementation/batch_04_cycle1_report.md b/_docs/03_implementation/batch_04_cycle1_report.md new file mode 100644 index 0000000..75bdbca --- /dev/null +++ b/_docs/03_implementation/batch_04_cycle1_report.md @@ -0,0 +1,47 @@ +# Batch Report + +**Batch**: 4 +**Tasks**: AZ-223_camera_ingest_calibration, AZ-224_mavlink_gcs_gateway, AZ-225_tile_manager_cache_manifest, AZ-227_fdr_event_recorder +**Date**: 2026-05-03 + +## Task Results + +| Task | Status | Files Modified | Tests | AC Coverage | Issues | +|------|--------|----------------|-------|-------------|--------| +| AZ-223_camera_ingest_calibration | Done | 4 files | Pass | 3/3 ACs covered | None | +| AZ-224_mavlink_gcs_gateway | Done | 4 files | Pass | 3/3 ACs covered | None | +| AZ-225_tile_manager_cache_manifest | Done | 4 files | Pass | 3/3 ACs covered | None | +| AZ-227_fdr_event_recorder | Done | 4 files | Pass | 3/3 ACs covered | None | + +## AC Test Coverage: All covered + +| AC Ref | Coverage | +|--------|----------| +| AZ-223 AC-1 | `test_valid_frame_packet_contains_metadata_reports_and_normalization_hint` verifies timestamp, calibration, quality, occlusion, and normalization metadata. | +| AZ-223 AC-2 | `test_total_occlusion_marks_frame_unusable_for_vio_and_anchor` verifies blackout frames are unavailable for visual paths. | +| AZ-223 AC-3 | `test_raw_frame_payload_retention_is_rejected` verifies raw frame payload retention is rejected. | +| AZ-224 AC-1 | `test_telemetry_subscription_emits_normalized_sample` verifies normalized shared telemetry samples. | +| AZ-224 AC-2 | `test_invalid_gps_input_estimate_is_rejected_without_emission` verifies unsafe `GPS_INPUT` requests are rejected without emission. | +| AZ-224 AC-3 | `test_operator_status_messages_are_rate_limited_by_text` verifies QGC-visible status rate limiting. | +| AZ-225 AC-1 | `test_valid_cache_manifest_activates_trusted_records` verifies valid cache activation. | +| AZ-225 AC-2 | `test_tampered_or_stale_tile_is_rejected_with_auditable_reason` verifies hash and freshness rejection reasons. | +| AZ-225 AC-3 | `test_tile_metadata_lookup_returns_record_or_explicit_rejection` verifies trusted metadata lookup and explicit rejection. | +| AZ-227 AC-1 | `test_valid_event_append_indexes_metadata_and_payload_reference` verifies event metadata and payload references are stored within bounds. | +| AZ-227 AC-2 | `test_rollover_threshold_records_explicit_rollover_result` verifies rollover is explicit. | +| AZ-227 AC-3 | `test_export_request_produces_queryable_evidence_artifacts` verifies export evidence and analytics references. | + +## Code Review Verdict: PASS + +Review report: `_docs/03_implementation/reviews/batch_04_review.md` + +## Auto-Fix Attempts: 0 + +## Stuck Agents: None + +## Verification + +- `.venv/bin/python -m black --check src tests e2e/replay` passed. +- `.venv/bin/python -m ruff check src tests e2e/replay` passed. +- `.venv/bin/python -m pytest` passed: 29 tests. + +## Next Batch: AZ-226_generated_tile_orthorectification diff --git a/_docs/03_implementation/reviews/batch_04_review.md b/_docs/03_implementation/reviews/batch_04_review.md new file mode 100644 index 0000000..4f1a7a8 --- /dev/null +++ b/_docs/03_implementation/reviews/batch_04_review.md @@ -0,0 +1,29 @@ +# Code Review Report + +**Batch**: AZ-223_camera_ingest_calibration, AZ-224_mavlink_gcs_gateway, AZ-225_tile_manager_cache_manifest, AZ-227_fdr_event_recorder +**Date**: 2026-05-03 +**Verdict**: PASS + +## Findings + +No findings. + +## Spec Compliance + +| Task | AC Coverage | Evidence | +|------|-------------|----------| +| AZ-223 | 3/3 covered | `tests/unit/test_camera_ingest_calibration.py` verifies packet metadata, blackout unusability, and raw-frame retention rejection. | +| AZ-224 | 3/3 covered | `tests/unit/test_mavlink_gcs_integration.py` verifies telemetry normalization, invalid GPS_INPUT rejection, and QGC status rate limiting. | +| AZ-225 | 3/3 covered | `tests/unit/test_tile_manager.py` verifies trusted cache activation, tamper/staleness rejection, and explicit metadata lookup rejection. | +| AZ-227 | 3/3 covered | `tests/unit/test_fdr_observability.py` verifies append/index behavior, rollover reporting, and export evidence artifacts. | + +## Architecture Compliance + +- Component writes stayed within the owning package directories declared in `_docs/02_document/module-layout.md`. +- Cross-component imports use shared public contracts and shared error envelopes only. +- No direct imports of another runtime component's internal modules were introduced. + +## Verification + +- `.venv/bin/python -m ruff check src/camera_ingest_calibration src/mavlink_gcs_integration src/tile_manager src/fdr_observability tests/unit/test_camera_ingest_calibration.py tests/unit/test_mavlink_gcs_integration.py tests/unit/test_tile_manager.py tests/unit/test_fdr_observability.py` passed. +- `.venv/bin/python -m pytest tests/unit/test_camera_ingest_calibration.py tests/unit/test_mavlink_gcs_integration.py tests/unit/test_tile_manager.py tests/unit/test_fdr_observability.py` passed: 12 tests. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index bf83571..2da5ff9 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -9,6 +9,6 @@ tracker: jira sub_step: phase: 1 name: batch-loop - detail: "batch 3: AZ-221_shared_geometry_time_sync, AZ-222_runtime_config_errors_telemetry" + detail: "batch 4: AZ-223_camera_ingest_calibration, AZ-224_mavlink_gcs_gateway, AZ-225_tile_manager_cache_manifest, AZ-227_fdr_event_recorder" retry_count: 0 cycle: 1 diff --git a/src/camera_ingest_calibration/__init__.py b/src/camera_ingest_calibration/__init__.py index a753ca0..f01decd 100644 --- a/src/camera_ingest_calibration/__init__.py +++ b/src/camera_ingest_calibration/__init__.py @@ -1 +1,22 @@ """Camera ingest and calibration component.""" + +from .interfaces import CameraFrameIngestor, FrameProvider +from .types import ( + CalibrationMetadata, + FrameQualityReport, + IngestedFramePacket, + NavigationFrame, + NormalizationHint, + OcclusionReport, +) + +__all__ = [ + "CalibrationMetadata", + "CameraFrameIngestor", + "FrameProvider", + "FrameQualityReport", + "IngestedFramePacket", + "NavigationFrame", + "NormalizationHint", + "OcclusionReport", +] diff --git a/src/camera_ingest_calibration/interfaces.py b/src/camera_ingest_calibration/interfaces.py index 85fd40f..c66a58e 100644 --- a/src/camera_ingest_calibration/interfaces.py +++ b/src/camera_ingest_calibration/interfaces.py @@ -2,9 +2,95 @@ from typing import Any, Protocol +from shared.contracts import FramePacket + +from .types import ( + CalibrationMetadata, + FrameQualityReport, + IngestedFramePacket, + NavigationFrame, + NormalizationHint, + OcclusionReport, +) + class FrameProvider(Protocol): """Source of navigation frames for downstream localization components.""" def next_frame(self) -> Any: """Return the next frame packet.""" + + +class CameraFrameIngestor: + """Build metadata-only frame packets for downstream localization components.""" + + def ingest( + self, + frame: NavigationFrame, + calibration: CalibrationMetadata, + ) -> IngestedFramePacket: + quality = self.classify_quality(frame) + occlusion = self.detect_occlusion(frame) + hint = NormalizationHint( + north_up_degrees=frame.north_up_degrees, + should_normalize_downstream=frame.north_up_degrees not in (None, 0.0), + ) + contract = FramePacket( + frame_id=frame.frame_id, + timestamp_ns=frame.timestamp_ns, + image_ref=frame.image_ref, + calibration_id=calibration.calibration_id, + occlusion=occlusion.state, + quality=quality.score, + normalization_hint=( + f"north_up_degrees={hint.north_up_degrees}" + if hint.should_normalize_downstream + else None + ), + raw_frame_retained=False, + ) + + return IngestedFramePacket( + contract=contract, + quality_report=quality, + occlusion_report=occlusion, + normalization_hint=hint, + ) + + def classify_quality(self, frame: NavigationFrame) -> FrameQualityReport: + if not frame.readable: + return FrameQualityReport(score=0.0, state="unusable", reasons=("unreadable",)) + + score = min(frame.mean_luma, frame.contrast) + reasons: list[str] = [] + if frame.mean_luma < 0.05: + reasons.append("blackout") + if frame.contrast < 0.05: + reasons.append("low_contrast") + + if reasons: + return FrameQualityReport(score=score, state="unusable", reasons=tuple(reasons)) + if score < 0.25: + return FrameQualityReport(score=score, state="degraded", reasons=("low_quality",)) + return FrameQualityReport(score=score, state="usable") + + def detect_occlusion(self, frame: NavigationFrame) -> OcclusionReport: + if not frame.readable: + return OcclusionReport( + state="unreadable", + usable_for_vio=False, + usable_for_anchor=False, + ) + if frame.mean_luma < 0.05 or frame.contrast < 0.05: + return OcclusionReport( + state="total", + usable_for_vio=False, + usable_for_anchor=False, + ) + if frame.mean_luma < 0.25 or frame.contrast < 0.25: + return OcclusionReport( + state="partial", + usable_for_vio=True, + usable_for_anchor=False, + ) + return OcclusionReport(state="clear", usable_for_vio=True, usable_for_anchor=True) diff --git a/src/camera_ingest_calibration/types.py b/src/camera_ingest_calibration/types.py index 3ff1b93..9bb1bdc 100644 --- a/src/camera_ingest_calibration/types.py +++ b/src/camera_ingest_calibration/types.py @@ -1,5 +1,70 @@ -"""Public camera ingest type aliases.""" +"""Public camera ingest models.""" -from typing import Any +from typing import Literal -FramePacketLike = Any +from pydantic import BaseModel, ConfigDict, Field, NonNegativeInt, PositiveFloat +from pydantic import model_validator + +from shared.contracts import FramePacket + + +class CameraIngestModel(BaseModel): + model_config = ConfigDict(extra="forbid", frozen=True) + + +class CalibrationMetadata(CameraIngestModel): + calibration_id: str = Field(min_length=1) + camera_model: str = Field(min_length=1) + image_width_px: int = Field(gt=0) + image_height_px: int = Field(gt=0) + focal_length_px: PositiveFloat + distortion_model: str = Field(min_length=1) + + +class NavigationFrame(CameraIngestModel): + frame_id: str = Field(min_length=1) + timestamp_ns: NonNegativeInt + image_ref: str = Field(min_length=1) + readable: bool = True + mean_luma: float = Field(ge=0.0, le=1.0) + contrast: float = Field(ge=0.0, le=1.0) + north_up_degrees: float | None = Field(default=None, ge=-180.0, le=180.0) + raw_frame_retained: bool = False + + @model_validator(mode="after") + def raw_payload_must_not_be_retained(self) -> "NavigationFrame": + if self.raw_frame_retained: + raise ValueError("camera ingest must retain references only, not raw frames") + return self + + +class FrameQualityReport(CameraIngestModel): + score: float = Field(ge=0.0, le=1.0) + state: Literal["usable", "degraded", "unusable"] + reasons: tuple[str, ...] = () + + +class OcclusionReport(CameraIngestModel): + state: Literal["clear", "partial", "total", "unreadable"] + usable_for_vio: bool + usable_for_anchor: bool + + +class NormalizationHint(CameraIngestModel): + north_up_degrees: float | None = Field(default=None, ge=-180.0, le=180.0) + should_normalize_downstream: bool = False + + +class IngestedFramePacket(CameraIngestModel): + contract: FramePacket + quality_report: FrameQualityReport + occlusion_report: OcclusionReport + normalization_hint: NormalizationHint + + @property + def usable_for_vio(self) -> bool: + return self.occlusion_report.usable_for_vio and self.quality_report.state != "unusable" + + @property + def usable_for_anchor(self) -> bool: + return self.occlusion_report.usable_for_anchor and self.quality_report.state != "unusable" diff --git a/src/fdr_observability/__init__.py b/src/fdr_observability/__init__.py index 3ce1fa4..fed8776 100644 --- a/src/fdr_observability/__init__.py +++ b/src/fdr_observability/__init__.py @@ -1 +1,22 @@ """Flight data recorder and observability component.""" + +from .interfaces import FlightRecorder, InMemoryFlightRecorder +from .types import ( + FdrAppendResult, + FdrExportRequest, + FdrExportResult, + FdrHealth, + FdrPayload, + FdrSegmentSummary, +) + +__all__ = [ + "FdrAppendResult", + "FdrExportRequest", + "FdrExportResult", + "FdrHealth", + "FdrPayload", + "FdrSegmentSummary", + "FlightRecorder", + "InMemoryFlightRecorder", +] diff --git a/src/fdr_observability/interfaces.py b/src/fdr_observability/interfaces.py index 920749c..1f07835 100644 --- a/src/fdr_observability/interfaces.py +++ b/src/fdr_observability/interfaces.py @@ -2,6 +2,18 @@ from typing import Any, Protocol +from shared.contracts import FdrEvent +from shared.errors import ErrorEnvelope + +from .types import ( + FdrAppendResult, + FdrExportRequest, + FdrExportResult, + FdrHealth, + FdrPayload, + FdrSegmentSummary, +) + class FlightRecorder(Protocol): """Append-only event recorder for runtime evidence.""" @@ -11,3 +23,99 @@ class FlightRecorder(Protocol): def export(self) -> Any: """Export recorded evidence for post-flight analysis.""" + + +class InMemoryFlightRecorder: + """Bounded append-only recorder for runtime evidence metadata.""" + + def __init__(self, segment_limit_bytes: int, storage_limit_bytes: int) -> None: + if segment_limit_bytes <= 0: + raise ValueError("segment_limit_bytes must be positive") + if storage_limit_bytes < segment_limit_bytes: + raise ValueError("storage_limit_bytes must be at least one segment") + self._segment_limit_bytes = segment_limit_bytes + self._storage_limit_bytes = storage_limit_bytes + self._segments: list[list[FdrEvent]] = [[]] + self._segment_bytes: list[int] = [0] + self._used_bytes = 0 + + @property + def health(self) -> FdrHealth: + if self._used_bytes >= self._storage_limit_bytes: + return FdrHealth( + status="critical", + used_bytes=self._used_bytes, + max_bytes=self._storage_limit_bytes, + message="fdr storage limit reached", + ) + if self._used_bytes >= int(self._storage_limit_bytes * 0.9): + return FdrHealth( + status="degraded", + used_bytes=self._used_bytes, + max_bytes=self._storage_limit_bytes, + message="fdr storage nearing limit", + ) + return FdrHealth( + status="ready", + used_bytes=self._used_bytes, + max_bytes=self._storage_limit_bytes, + message="fdr storage ready", + ) + + def append_event(self, event: FdrEvent, payload: FdrPayload) -> FdrAppendResult: + if self._used_bytes + payload.size_bytes > self._storage_limit_bytes: + return FdrAppendResult( + appended=False, + error=ErrorEnvelope( + component="fdr_observability", + category="resource", + message="fdr storage limit reached", + severity="critical", + retryable=False, + ), + ) + + rollover = False + if self._segment_bytes[-1] + payload.size_bytes > self._segment_limit_bytes: + self._segments.append([]) + self._segment_bytes.append(0) + rollover = True + + segment_index = len(self._segments) - 1 + stored_event = event.model_copy(update={"payload_ref": payload.ref}) + self._segments[segment_index].append(stored_event) + self._segment_bytes[segment_index] += payload.size_bytes + self._used_bytes += payload.size_bytes + + return FdrAppendResult( + appended=True, + event=stored_event, + segment_id=self._segment_id(segment_index), + rollover=rollover, + ) + + def export(self, request: FdrExportRequest) -> FdrExportResult: + segments = tuple( + FdrSegmentSummary( + segment_id=self._segment_id(index), + event_count=len(events), + bytes_used=self._segment_bytes[index], + ) + for index, events in enumerate(self._segments) + if events + ) + evidence_ref = f"fdr://exports/{request.mission_id}/{request.run_id}/evidence.json" + analytics_ref = ( + f"fdr://exports/{request.mission_id}/{request.run_id}/analytics.parquet" + if request.include_analytics + else None + ) + return FdrExportResult( + produced=True, + evidence_ref=evidence_ref, + segments=segments, + analytics_ref=analytics_ref, + ) + + def _segment_id(self, index: int) -> str: + return f"segment-{index + 1:04d}" diff --git a/src/fdr_observability/types.py b/src/fdr_observability/types.py index aeae7b2..e1afac2 100644 --- a/src/fdr_observability/types.py +++ b/src/fdr_observability/types.py @@ -1,5 +1,52 @@ -"""Public FDR type aliases.""" +"""Public FDR models.""" -from typing import Any +from typing import Literal -FdrEventLike = Any +from pydantic import BaseModel, ConfigDict, Field, NonNegativeInt, PositiveInt + +from shared.contracts import FdrEvent +from shared.errors import ErrorEnvelope + + +class FdrModel(BaseModel): + model_config = ConfigDict(extra="forbid", frozen=True) + + +class FdrPayload(FdrModel): + ref: str = Field(min_length=1) + size_bytes: PositiveInt + redacted: bool = True + + +class FdrAppendResult(FdrModel): + appended: bool + event: FdrEvent | None = None + segment_id: str | None = None + rollover: bool = False + error: ErrorEnvelope | None = None + + +class FdrSegmentSummary(FdrModel): + segment_id: str = Field(min_length=1) + event_count: NonNegativeInt + bytes_used: NonNegativeInt + + +class FdrHealth(FdrModel): + status: Literal["ready", "degraded", "critical"] + used_bytes: NonNegativeInt + max_bytes: PositiveInt + message: str + + +class FdrExportRequest(FdrModel): + mission_id: str = Field(min_length=1) + run_id: str = Field(min_length=1) + include_analytics: bool = False + + +class FdrExportResult(FdrModel): + produced: bool + evidence_ref: str = Field(min_length=1) + segments: tuple[FdrSegmentSummary, ...] + analytics_ref: str | None = None diff --git a/src/mavlink_gcs_integration/__init__.py b/src/mavlink_gcs_integration/__init__.py index b829296..9e3946b 100644 --- a/src/mavlink_gcs_integration/__init__.py +++ b/src/mavlink_gcs_integration/__init__.py @@ -1 +1,24 @@ """MAVLink and GCS integration component.""" + +from .interfaces import InMemoryMavlinkGateway, MavlinkGateway +from .types import ( + FlightControllerTelemetry, + GpsEmissionResult, + GpsInputPacket, + OperatorStatusMessage, + StatusEmissionResult, + gps_input_from_estimate, + normalize_telemetry, +) + +__all__ = [ + "FlightControllerTelemetry", + "GpsEmissionResult", + "GpsInputPacket", + "InMemoryMavlinkGateway", + "MavlinkGateway", + "OperatorStatusMessage", + "StatusEmissionResult", + "gps_input_from_estimate", + "normalize_telemetry", +] diff --git a/src/mavlink_gcs_integration/interfaces.py b/src/mavlink_gcs_integration/interfaces.py index 8c6b897..5fa2a5c 100644 --- a/src/mavlink_gcs_integration/interfaces.py +++ b/src/mavlink_gcs_integration/interfaces.py @@ -2,6 +2,20 @@ from typing import Any, Protocol +from pydantic import ValidationError + +from shared.contracts import PositionEstimate, TelemetrySample +from shared.errors import ErrorEnvelope + +from .types import ( + FlightControllerTelemetry, + GpsEmissionResult, + OperatorStatusMessage, + StatusEmissionResult, + gps_input_from_estimate, + normalize_telemetry, +) + class MavlinkGateway(Protocol): """Bridges FC telemetry inputs and localization GPS_INPUT outputs.""" @@ -11,3 +25,62 @@ class MavlinkGateway(Protocol): def emit_gps_input(self, estimate: Any) -> None: """Emit one localization estimate to the flight controller.""" + + +class InMemoryMavlinkGateway: + """Deterministic gateway boundary used by runtime adapters and tests.""" + + def __init__(self, status_rate_limit_ns: int) -> None: + if status_rate_limit_ns < 0: + raise ValueError("status_rate_limit_ns must be non-negative") + self._status_rate_limit_ns = status_rate_limit_ns + self._last_status_timestamp_by_text: dict[str, int] = {} + self.emitted_gps_inputs: list[object] = [] + self.emitted_status_messages: list[OperatorStatusMessage] = [] + + def subscribe_telemetry( + self, + samples: list[FlightControllerTelemetry], + ) -> tuple[TelemetrySample, ...]: + return tuple(normalize_telemetry(sample) for sample in samples) + + def emit_gps_input(self, estimate: PositionEstimate) -> GpsEmissionResult: + try: + packet = gps_input_from_estimate(estimate) + except ValidationError as error: + return GpsEmissionResult( + emitted=False, + error=ErrorEnvelope( + component="mavlink_gcs_integration", + category="validation", + message="position estimate is unsafe for GPS_INPUT emission", + severity="error", + retryable=False, + cause=str(error), + ), + ) + + self.emitted_gps_inputs.append(packet) + return GpsEmissionResult(emitted=True, packet=packet) + + def emit_status( + self, + messages: list[OperatorStatusMessage], + ) -> StatusEmissionResult: + emitted: list[OperatorStatusMessage] = [] + suppressed: list[OperatorStatusMessage] = [] + + for message in messages: + last_timestamp = self._last_status_timestamp_by_text.get(message.text) + if ( + last_timestamp is not None + and message.timestamp_ns - last_timestamp < self._status_rate_limit_ns + ): + suppressed.append(message) + continue + + self._last_status_timestamp_by_text[message.text] = message.timestamp_ns + self.emitted_status_messages.append(message) + emitted.append(message) + + return StatusEmissionResult(emitted=tuple(emitted), suppressed=tuple(suppressed)) diff --git a/src/mavlink_gcs_integration/types.py b/src/mavlink_gcs_integration/types.py index f9ca9bd..01f07d6 100644 --- a/src/mavlink_gcs_integration/types.py +++ b/src/mavlink_gcs_integration/types.py @@ -1,5 +1,80 @@ -"""Public MAVLink/GCS type aliases.""" +"""Public MAVLink/GCS models.""" -from typing import Any +from typing import Literal -TelemetrySampleLike = Any +from pydantic import BaseModel, ConfigDict, Field, NonNegativeFloat, NonNegativeInt + +from shared.contracts import PositionEstimate, TelemetrySample +from shared.errors import ErrorEnvelope + + +class MavlinkModel(BaseModel): + model_config = ConfigDict(extra="forbid", frozen=True) + + +class FlightControllerTelemetry(MavlinkModel): + timestamp_ns: NonNegativeInt + acceleration_mps2: tuple[float, float, float] + attitude_rad: tuple[float, float, float] + altitude_m: float + airspeed_mps: NonNegativeFloat + gps_health: Literal["healthy", "degraded", "lost", "spoofed"] + + +class GpsInputPacket(MavlinkModel): + timestamp_ns: NonNegativeInt + latitude_deg: float = Field(ge=-90.0, le=90.0) + longitude_deg: float = Field(ge=-180.0, le=180.0) + altitude_m: float + fix_type: int = Field(ge=2, le=3) + horizontal_accuracy_m: NonNegativeFloat + source_label: str = Field(min_length=1) + + +class GpsEmissionResult(MavlinkModel): + emitted: bool + packet: GpsInputPacket | None = None + error: ErrorEnvelope | None = None + + +class OperatorStatusMessage(MavlinkModel): + timestamp_ns: NonNegativeInt + severity: Literal["info", "warning", "error", "critical"] + text: str = Field(min_length=1) + visible_to_qgc: bool = True + + +class StatusEmissionResult(MavlinkModel): + emitted: tuple[OperatorStatusMessage, ...] + suppressed: tuple[OperatorStatusMessage, ...] = () + + +def normalize_telemetry(sample: FlightControllerTelemetry) -> TelemetrySample: + return TelemetrySample( + timestamp_ns=sample.timestamp_ns, + imu={ + "accel_x": sample.acceleration_mps2[0], + "accel_y": sample.acceleration_mps2[1], + "accel_z": sample.acceleration_mps2[2], + }, + attitude={ + "roll": sample.attitude_rad[0], + "pitch": sample.attitude_rad[1], + "yaw": sample.attitude_rad[2], + }, + altitude_m=sample.altitude_m, + airspeed_mps=sample.airspeed_mps, + gps_health=sample.gps_health, + ) + + +def gps_input_from_estimate(estimate: PositionEstimate) -> GpsInputPacket: + return GpsInputPacket( + timestamp_ns=estimate.timestamp_ns, + latitude_deg=estimate.latitude_deg, + longitude_deg=estimate.longitude_deg, + altitude_m=estimate.altitude_m, + fix_type=estimate.fix_type, + horizontal_accuracy_m=estimate.horizontal_accuracy_m, + source_label=estimate.source_label, + ) diff --git a/src/tile_manager/__init__.py b/src/tile_manager/__init__.py index fafc336..2cf13da 100644 --- a/src/tile_manager/__init__.py +++ b/src/tile_manager/__init__.py @@ -1 +1,20 @@ """Tile cache and generated tile lifecycle component.""" + +from .interfaces import LocalTileManager, TileManager +from .types import ( + CacheValidationReport, + TileManifestEntry, + TileMetadataLookup, + TileValidationDecision, + freshness_status, +) + +__all__ = [ + "CacheValidationReport", + "LocalTileManager", + "TileManager", + "TileManifestEntry", + "TileMetadataLookup", + "TileValidationDecision", + "freshness_status", +] diff --git a/src/tile_manager/interfaces.py b/src/tile_manager/interfaces.py index cd0e238..9f98341 100644 --- a/src/tile_manager/interfaces.py +++ b/src/tile_manager/interfaces.py @@ -1,7 +1,19 @@ """Public tile manager interfaces.""" +from datetime import datetime from typing import Any, Protocol +from shared.contracts import CacheTileRecord +from shared.errors import ErrorEnvelope + +from .types import ( + CacheValidationReport, + TileManifestEntry, + TileMetadataLookup, + TileValidationDecision, + freshness_status, +) + class TileManager(Protocol): """Validates and serves local cache tile records.""" @@ -11,3 +23,124 @@ class TileManager(Protocol): def get_tile_window(self, footprint: Any) -> list[Any]: """Return tiles intersecting a requested footprint.""" + + +class LocalTileManager: + """Validates preloaded local cache metadata and serves trusted tile records.""" + + def __init__( + self, + trusted_signature_hashes: set[str], + now: datetime, + postgis_available: bool = True, + ) -> None: + self._trusted_signature_hashes = trusted_signature_hashes + self._now = now + self._postgis_available = postgis_available + self._trusted_by_tile_id: dict[str, CacheTileRecord] = {} + self._descriptor_by_tile_id: dict[str, str] = {} + self._tile_id_by_chunk_id: dict[str, str] = {} + + def validate_cache(self, entries: list[TileManifestEntry]) -> CacheValidationReport: + if not self._postgis_available: + decisions = tuple( + TileValidationDecision( + tile_id=entry.tile_id, + accepted=False, + reason="postgis_unavailable", + ) + for entry in entries + ) + return CacheValidationReport(activated=False, decisions=decisions) + + decisions = tuple(self._validate_entry(entry) for entry in entries) + self._trusted_by_tile_id = { + decision.record.tile_id: decision.record + for decision in decisions + if decision.record is not None + } + self._descriptor_by_tile_id = { + entry.tile_id: entry.descriptor_ref + for entry in entries + if entry.tile_id in self._trusted_by_tile_id + } + self._tile_id_by_chunk_id = { + entry.chunk_id: entry.tile_id + for entry in entries + if entry.tile_id in self._trusted_by_tile_id + } + + return CacheValidationReport( + activated=bool(self._trusted_by_tile_id) + and all(decision.accepted for decision in decisions), + decisions=decisions, + ) + + def get_tile_window(self, footprint: Any) -> list[CacheTileRecord]: + if isinstance(footprint, dict) and "chunk_id" in footprint: + tile_id = self._tile_id_by_chunk_id.get(str(footprint["chunk_id"])) + return [self._trusted_by_tile_id[tile_id]] if tile_id is not None else [] + return list(self._trusted_by_tile_id.values()) + + def get_tile_metadata(self, chunk_id: str) -> TileMetadataLookup: + tile_id = self._tile_id_by_chunk_id.get(chunk_id) + if tile_id is None: + return TileMetadataLookup( + found=False, + error=ErrorEnvelope( + component="tile_manager", + category="validation", + message=f"no trusted tile metadata for chunk {chunk_id}", + severity="warning", + retryable=False, + ), + ) + + return TileMetadataLookup( + found=True, + record=self._trusted_by_tile_id[tile_id], + descriptor_ref=self._descriptor_by_tile_id[tile_id], + ) + + def _validate_entry(self, entry: TileManifestEntry) -> TileValidationDecision: + if entry.signature_hash not in self._trusted_signature_hashes: + return TileValidationDecision( + tile_id=entry.tile_id, + accepted=False, + reason="signature_not_trusted", + ) + + if entry.content_hash != entry.expected_content_hash: + return TileValidationDecision( + tile_id=entry.tile_id, + accepted=False, + reason="content_hash_mismatch", + ) + + if entry.sidecar_hash != entry.expected_sidecar_hash: + return TileValidationDecision( + tile_id=entry.tile_id, + accepted=False, + reason="sidecar_hash_mismatch", + ) + + freshness = freshness_status(entry.expires_at, self._now) + if freshness == "stale": + return TileValidationDecision(tile_id=entry.tile_id, accepted=False, reason="stale") + + record = CacheTileRecord( + tile_id=entry.tile_id, + crs=entry.crs, + meters_per_pixel=entry.meters_per_pixel, + capture_date=entry.capture_date, + signature_hash=entry.signature_hash, + trust_level="trusted", + freshness_status=freshness, + provenance=entry.provenance, + ) + return TileValidationDecision( + tile_id=entry.tile_id, + accepted=True, + reason="trusted", + record=record, + ) diff --git a/src/tile_manager/types.py b/src/tile_manager/types.py index ff74274..bc074cb 100644 --- a/src/tile_manager/types.py +++ b/src/tile_manager/types.py @@ -1,5 +1,61 @@ -"""Public tile manager type aliases.""" +"""Public tile manager models.""" -from typing import Any +from datetime import date, datetime, timezone +from typing import Literal -CacheTileRecordLike = Any +from pydantic import BaseModel, ConfigDict, Field, PositiveFloat + +from shared.contracts import CacheTileRecord +from shared.errors import ErrorEnvelope + + +class TileManagerModel(BaseModel): + model_config = ConfigDict(extra="forbid", frozen=True) + + +class TileManifestEntry(TileManagerModel): + tile_id: str = Field(min_length=1) + chunk_id: str = Field(min_length=1) + crs: str = Field(min_length=1) + meters_per_pixel: PositiveFloat + capture_date: date + expires_at: datetime + content_hash: str = Field(min_length=1) + expected_content_hash: str = Field(min_length=1) + sidecar_hash: str = Field(min_length=1) + expected_sidecar_hash: str = Field(min_length=1) + signature_hash: str = Field(min_length=1) + provenance: str = Field(min_length=1) + footprint: dict[str, float] + descriptor_ref: str = Field(min_length=1) + + +class TileValidationDecision(TileManagerModel): + tile_id: str = Field(min_length=1) + accepted: bool + reason: str + record: CacheTileRecord | None = None + + +class CacheValidationReport(TileManagerModel): + activated: bool + decisions: tuple[TileValidationDecision, ...] + + @property + def trusted_records(self) -> tuple[CacheTileRecord, ...]: + return tuple(decision.record for decision in self.decisions if decision.record is not None) + + +class TileMetadataLookup(TileManagerModel): + found: bool + record: CacheTileRecord | None = None + descriptor_ref: str | None = None + error: ErrorEnvelope | None = None + + +def freshness_status(expires_at: datetime, now: datetime) -> Literal["fresh", "stale"]: + normalized_expiry = expires_at + if normalized_expiry.tzinfo is None: + normalized_expiry = normalized_expiry.replace(tzinfo=timezone.utc) + normalized_now = now if now.tzinfo is not None else now.replace(tzinfo=timezone.utc) + return "fresh" if normalized_expiry >= normalized_now else "stale" diff --git a/tests/unit/test_camera_ingest_calibration.py b/tests/unit/test_camera_ingest_calibration.py new file mode 100644 index 0000000..deecfe2 --- /dev/null +++ b/tests/unit/test_camera_ingest_calibration.py @@ -0,0 +1,76 @@ +import pytest +from pydantic import ValidationError + +from camera_ingest_calibration import ( + CalibrationMetadata, + CameraFrameIngestor, + NavigationFrame, +) + + +def _calibration() -> CalibrationMetadata: + return CalibrationMetadata( + calibration_id="calib-front-1", + camera_model="global-shutter", + image_width_px=1920, + image_height_px=1080, + focal_length_px=840.0, + distortion_model="plumb_bob", + ) + + +def test_valid_frame_packet_contains_metadata_reports_and_normalization_hint() -> None: + # Arrange + frame = NavigationFrame( + frame_id="frame-1", + timestamp_ns=1_000, + image_ref="replay/frame-1.jpg", + mean_luma=0.7, + contrast=0.6, + north_up_degrees=12.5, + ) + + # Act + packet = CameraFrameIngestor().ingest(frame, _calibration()) + + # Assert + assert packet.contract.timestamp_ns == 1_000 + assert packet.contract.calibration_id == "calib-front-1" + assert packet.quality_report.state == "usable" + assert packet.occlusion_report.state == "clear" + assert packet.normalization_hint.should_normalize_downstream is True + + +def test_total_occlusion_marks_frame_unusable_for_vio_and_anchor() -> None: + # Arrange + frame = NavigationFrame( + frame_id="frame-blackout", + timestamp_ns=2_000, + image_ref="replay/frame-blackout.jpg", + mean_luma=0.01, + contrast=0.01, + ) + + # Act + packet = CameraFrameIngestor().ingest(frame, _calibration()) + + # Assert + assert packet.occlusion_report.state == "total" + assert packet.usable_for_vio is False + assert packet.usable_for_anchor is False + + +def test_raw_frame_payload_retention_is_rejected() -> None: + # Act + with pytest.raises(ValidationError) as error: + NavigationFrame( + frame_id="frame-raw", + timestamp_ns=3_000, + image_ref="replay/frame-raw.jpg", + mean_luma=0.7, + contrast=0.6, + raw_frame_retained=True, + ) + + # Assert + assert "references only" in str(error.value) diff --git a/tests/unit/test_fdr_observability.py b/tests/unit/test_fdr_observability.py new file mode 100644 index 0000000..ee35c7d --- /dev/null +++ b/tests/unit/test_fdr_observability.py @@ -0,0 +1,64 @@ +from shared.contracts import FdrEvent + +from fdr_observability import FdrExportRequest, FdrPayload, InMemoryFlightRecorder + + +def _event(event_type: str = "anchor") -> FdrEvent: + return FdrEvent( + event_type=event_type, + timestamp_ns=1_000, + component="anchor_verification", + severity="info", + payload_ref="pending", + mission_id="mission-1", + run_id="run-1", + ) + + +def test_valid_event_append_indexes_metadata_and_payload_reference() -> None: + # Arrange + recorder = InMemoryFlightRecorder(segment_limit_bytes=1_000, storage_limit_bytes=2_000) + payload = FdrPayload(ref="fdr://segments/1/payloads/anchor-1.cbor", size_bytes=128) + + # Act + result = recorder.append_event(_event(), payload) + + # Assert + assert result.appended is True + assert result.event is not None + assert result.event.payload_ref == payload.ref + assert result.segment_id == "segment-0001" + assert recorder.health.status == "ready" + + +def test_rollover_threshold_records_explicit_rollover_result() -> None: + # Arrange + recorder = InMemoryFlightRecorder(segment_limit_bytes=100, storage_limit_bytes=500) + recorder.append_event(_event("first"), FdrPayload(ref="fdr://payloads/1", size_bytes=80)) + + # Act + result = recorder.append_event( + _event("second"), FdrPayload(ref="fdr://payloads/2", size_bytes=50) + ) + + # Assert + assert result.appended is True + assert result.rollover is True + assert result.segment_id == "segment-0002" + + +def test_export_request_produces_queryable_evidence_artifacts() -> None: + # Arrange + recorder = InMemoryFlightRecorder(segment_limit_bytes=1_000, storage_limit_bytes=2_000) + recorder.append_event(_event(), FdrPayload(ref="fdr://payloads/1", size_bytes=128)) + + # Act + result = recorder.export( + FdrExportRequest(mission_id="mission-1", run_id="run-1", include_analytics=True) + ) + + # Assert + assert result.produced is True + assert result.evidence_ref == "fdr://exports/mission-1/run-1/evidence.json" + assert result.analytics_ref == "fdr://exports/mission-1/run-1/analytics.parquet" + assert result.segments[0].event_count == 1 diff --git a/tests/unit/test_mavlink_gcs_integration.py b/tests/unit/test_mavlink_gcs_integration.py new file mode 100644 index 0000000..b0de349 --- /dev/null +++ b/tests/unit/test_mavlink_gcs_integration.py @@ -0,0 +1,72 @@ +from shared.contracts import PositionEstimate + +from mavlink_gcs_integration import ( + FlightControllerTelemetry, + InMemoryMavlinkGateway, + OperatorStatusMessage, +) + + +def test_telemetry_subscription_emits_normalized_sample() -> None: + # Arrange + gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000) + telemetry = FlightControllerTelemetry( + timestamp_ns=1_000, + acceleration_mps2=(0.1, 0.2, -9.8), + attitude_rad=(0.01, 0.02, 1.57), + altitude_m=250.0, + airspeed_mps=17.5, + gps_health="lost", + ) + + # Act + samples = gateway.subscribe_telemetry([telemetry]) + + # Assert + assert len(samples) == 1 + assert samples[0].imu["accel_z"] == -9.8 + assert samples[0].attitude["yaw"] == 1.57 + assert samples[0].gps_health == "lost" + + +def test_invalid_gps_input_estimate_is_rejected_without_emission() -> None: + # Arrange + gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000) + estimate = PositionEstimate( + timestamp_ns=2_000, + latitude_deg=49.9, + longitude_deg=36.2, + altitude_m=250.0, + covariance_semimajor_m=10.0, + source_label="no_fix", + fix_type=1, + horizontal_accuracy_m=10.0, + anchor_age_ms=0, + ) + + # Act + result = gateway.emit_gps_input(estimate) + + # Assert + assert result.emitted is False + assert result.error is not None + assert result.error.category == "validation" + assert gateway.emitted_gps_inputs == [] + + +def test_operator_status_messages_are_rate_limited_by_text() -> None: + # Arrange + gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000) + messages = [ + OperatorStatusMessage(timestamp_ns=1_000, severity="warning", text="GPS denied"), + OperatorStatusMessage(timestamp_ns=1_500, severity="warning", text="GPS denied"), + OperatorStatusMessage(timestamp_ns=2_100, severity="warning", text="GPS denied"), + ] + + # Act + result = gateway.emit_status(messages) + + # Assert + assert [message.timestamp_ns for message in result.emitted] == [1_000, 2_100] + assert [message.timestamp_ns for message in result.suppressed] == [1_500] + assert len(gateway.emitted_status_messages) == 2 diff --git a/tests/unit/test_tile_manager.py b/tests/unit/test_tile_manager.py new file mode 100644 index 0000000..c800b54 --- /dev/null +++ b/tests/unit/test_tile_manager.py @@ -0,0 +1,78 @@ +from datetime import datetime, timezone + +from tile_manager import LocalTileManager, TileManifestEntry + +NOW = datetime(2026, 5, 3, tzinfo=timezone.utc) + + +def _entry(**overrides: object) -> TileManifestEntry: + payload: dict[str, object] = { + "tile_id": "tile-1", + "chunk_id": "chunk-1", + "crs": "EPSG:3857", + "meters_per_pixel": 0.3, + "capture_date": "2026-05-01", + "expires_at": "2026-06-01T00:00:00+00:00", + "content_hash": "sha256:tile", + "expected_content_hash": "sha256:tile", + "sidecar_hash": "sha256:sidecar", + "expected_sidecar_hash": "sha256:sidecar", + "signature_hash": "sig:trusted", + "provenance": "suite-satellite-service", + "footprint": {"min_lat": 49.0, "max_lat": 50.0}, + "descriptor_ref": "descriptors/chunk-1.vlad", + } + payload.update(overrides) + return TileManifestEntry.model_validate(payload) + + +def test_valid_cache_manifest_activates_trusted_records() -> None: + # Arrange + manager = LocalTileManager(trusted_signature_hashes={"sig:trusted"}, now=NOW) + + # Act + report = manager.validate_cache([_entry()]) + + # Assert + assert report.activated is True + assert report.decisions[0].accepted is True + assert report.trusted_records[0].trust_level == "trusted" + + +def test_tampered_or_stale_tile_is_rejected_with_auditable_reason() -> None: + # Arrange + manager = LocalTileManager(trusted_signature_hashes={"sig:trusted"}, now=NOW) + tampered = _entry(tile_id="tile-tampered", content_hash="sha256:bad") + stale = _entry( + tile_id="tile-stale", + chunk_id="chunk-stale", + expires_at="2026-05-01T00:00:00+00:00", + ) + + # Act + report = manager.validate_cache([tampered, stale]) + + # Assert + assert report.activated is False + assert [decision.reason for decision in report.decisions] == [ + "content_hash_mismatch", + "stale", + ] + + +def test_tile_metadata_lookup_returns_record_or_explicit_rejection() -> None: + # Arrange + manager = LocalTileManager(trusted_signature_hashes={"sig:trusted"}, now=NOW) + manager.validate_cache([_entry()]) + + # Act + found = manager.get_tile_metadata("chunk-1") + missing = manager.get_tile_metadata("missing") + + # Assert + assert found.found is True + assert found.record is not None + assert found.descriptor_ref == "descriptors/chunk-1.vlad" + assert missing.found is False + assert missing.error is not None + assert missing.error.category == "validation"