[AZ-294] [AZ-295] [AZ-296] Finish C13: tile snapshot + record-kind policy + takeoff abort

AZ-294: MidFlightTileSnapshotSink writes orthorectified tile JPEGs
atomically to flight_root/<flight_id>/tiles/<tile_id>.jpg, emits a
kind="mid_flight_tile_snapshot" pointer record, and evicts the oldest
tile when the per-flight 64 MiB cap is exceeded. Adds optional
frame_id to the snapshot payload (fdr_record_schema bump).

AZ-295: RecordKindPolicy with two paired gates:
- enforce_or_raise (producer-side) raises RawFrameWriteForbiddenError
  for raw_nav_frame / raw_ai_cam_frame at the call site, defending
  AC-8.5 / RESTRICT-UAV-4.
- gate_for_writer (writer-side) tumbling-window rate-caps
  failed_tile_thumbnail records at <= 0.1 Hz; over-cap drops are
  coalesced into kind="overrun" records with the originating
  producer slug.

AZ-296: take_off() composition-root sequence with strict ordering
(writer.__init__ -> start -> open_flight -> fc_adapter.__init__ ->
fc_adapter.open). On FdrOpenError, logs ERROR record, calls
writer.stop(), prints the documented FATAL line to stderr, and
sys.exit(EXIT_FDR_OPEN_FAILURE=2). composition_root_protocol bumped
to v1.1.0 with the new constants + takeoff-sequence section.

29 new tests; full suite 356 passed / 2 skipped / 0 failures.
No new dependencies (stdlib only).

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-11 03:52:07 +03:00
parent b5dd6031d2
commit e4ecdaf619
21 changed files with 1657 additions and 9 deletions
@@ -7,9 +7,20 @@ from gps_denied_onboard.components.c13_fdr.errors import (
FdrConcurrentWriterError,
FdrOpenError,
FdrWriterError,
RawFrameWriteForbiddenError,
TileSnapshotInvalidIdError,
TileSnapshotTooLargeError,
)
from gps_denied_onboard.components.c13_fdr.headers import FlightFooter, FlightHeader
from gps_denied_onboard.components.c13_fdr.interface import FdrWriter
from gps_denied_onboard.components.c13_fdr.record_kind_policy import (
GateDecision,
RecordKindPolicy,
make_record_kind_policy,
)
from gps_denied_onboard.components.c13_fdr.tile_snapshot_sink import (
MidFlightTileSnapshotSink,
)
from gps_denied_onboard.components.c13_fdr.writer import FileFdrWriter
__all__ = [
@@ -23,4 +34,11 @@ __all__ = [
"FileFdrWriter",
"FlightFooter",
"FlightHeader",
"GateDecision",
"MidFlightTileSnapshotSink",
"RawFrameWriteForbiddenError",
"RecordKindPolicy",
"TileSnapshotInvalidIdError",
"TileSnapshotTooLargeError",
"make_record_kind_policy",
]
@@ -1,4 +1,4 @@
"""C13 FDR writer error types (AZ-291 / AZ-292 / AZ-293)."""
"""C13 FDR writer error types (AZ-291 / AZ-292 / AZ-293 / AZ-294 / AZ-295)."""
from __future__ import annotations
@@ -8,9 +8,44 @@ __all__ = [
"FdrConcurrentWriterError",
"FdrOpenError",
"FdrWriterError",
"RawFrameWriteForbiddenError",
"TileSnapshotInvalidIdError",
"TileSnapshotTooLargeError",
]
class TileSnapshotTooLargeError(ValueError):
"""Raised by `MidFlightTileSnapshotSink.write_snapshot` (AZ-294) when the
input JPEG exceeds the configured ``jpeg_max_bytes`` ceiling.
The sink does not trust producers to self-cap their JPEG size; this
bound short-circuits adversarial / runaway producer behaviour before
any sidecar file is written.
"""
class TileSnapshotInvalidIdError(ValueError):
"""Raised by `MidFlightTileSnapshotSink.write_snapshot` (AZ-294) when the
input ``tile_id`` does not match the documented identifier regex.
The regex rejects path-traversal sequences (e.g. ``../../etc/passwd``)
and any character outside ``[a-zA-Z0-9_-]``; size is bounded to 128
chars.
"""
class RawFrameWriteForbiddenError(RuntimeError):
"""Raised by `RecordKindPolicy.enforce_or_raise` (AZ-295) when a
producer attempts to enqueue an `FdrRecord` whose ``kind`` is in
the configured forbidden set (defaults to raw-frame variants).
AC-8.5 / RESTRICT-UAV-4: raw nav/AI-cam frames are NEVER allowed on
durable storage. The exception is raised SYNCHRONOUSLY at the
producer's call site so the offending caller sees the security
error immediately.
"""
class FdrWriterError(RuntimeError):
"""Base class for every C13 writer-side runtime error."""
@@ -0,0 +1,191 @@
"""``RecordKindPolicy`` — AC-8.5 / RESTRICT-UAV-4 record-kind gates (AZ-295).
Two paired gates with intentionally asymmetric semantics:
- ``enforce_or_raise(record)`` — producer-side synchronous check. Raises
:class:`RawFrameWriteForbiddenError` when ``record.kind`` is in the
configured forbidden set; returns silently otherwise. Producers call
this immediately BEFORE ``fdr_client.enqueue(record)``.
- ``gate_for_writer(record)`` — writer-side soft rate cap on
``kind="failed_tile_thumbnail"``. Returns ``GateDecision.ENQUEUE``
for in-cap records and ``GateDecision.DROP`` for over-cap thumbnails.
Drops accumulate into a per-window ``dropped_count`` that is emitted
as a single coalesced ``kind="overrun"`` record at the close of each
window (matches AZ-274 overrun semantics).
The two gates exist together so a forbidden-kind regression in a
producer is caught at the call site (security failure visible to the
offending caller), and a thumbnail-flood regression is caught on the
write path without exploding error counts (rate-cap with audit
trail).
"""
from __future__ import annotations
import enum
import threading
import time
from collections.abc import Iterable
from datetime import datetime, timezone
from gps_denied_onboard.components.c13_fdr.errors import (
RawFrameWriteForbiddenError,
)
from gps_denied_onboard.config import RecordKindPolicyConfig
from gps_denied_onboard.fdr_client.records import (
OVERRUN_KIND,
OVERRUN_PRODUCER_ID,
FdrRecord,
)
from gps_denied_onboard.logging import get_logger
__all__ = ["GateDecision", "RecordKindPolicy", "make_record_kind_policy"]
_THUMBNAIL_KIND = "failed_tile_thumbnail"
_LOG_RATE_LIMIT_S = 1.0
class GateDecision(enum.Enum):
ENQUEUE = "enqueue"
DROP = "drop"
class _ThumbnailRateCap:
"""Per-window admission counter for `failed_tile_thumbnail` records.
Maintains a single window starting at the time of the first record;
the window is ``(1.0 / max_hz)`` seconds wide. Up to one thumbnail
is admitted per window; subsequent records are counted into
``dropped_in_current_window`` until the window closes.
Window close emits a coalesced overrun record carrying the
accumulated drop count.
"""
def __init__(self, max_hz: float) -> None:
self._window_s = 1.0 / max_hz
self._window_start_mono: float | None = None
self._admitted_in_window = 0
self._dropped_in_window = 0
self._dropped_producer: str | None = None
self._lock = threading.Lock()
def admit(self, producer_id: str) -> bool:
now = time.monotonic()
with self._lock:
if self._window_start_mono is None or now - self._window_start_mono >= self._window_s:
# Window closed (or first call). Reset.
self._window_start_mono = now
self._admitted_in_window = 0
self._dropped_in_window = 0
self._dropped_producer = None
if self._admitted_in_window == 0:
self._admitted_in_window = 1
return True
self._dropped_in_window += 1
self._dropped_producer = producer_id
return False
def drain_dropped(self) -> tuple[int, str | None]:
"""Return ``(dropped_count, producer_id)`` and clear the accumulator."""
with self._lock:
count = self._dropped_in_window
producer = self._dropped_producer
self._dropped_in_window = 0
self._dropped_producer = None
return count, producer
class RecordKindPolicy:
"""Per-flight record-kind policy (AZ-295)."""
def __init__(self, config: RecordKindPolicyConfig) -> None:
if not isinstance(config, RecordKindPolicyConfig):
raise TypeError(
f"RecordKindPolicy.config must be RecordKindPolicyConfig; "
f"got {type(config).__name__}"
)
self._forbidden_kinds: frozenset[str] = config.forbidden_record_kinds
self._rate_cap = _ThumbnailRateCap(max_hz=config.failed_tile_thumbnail_max_hz)
self._last_warn_t = 0.0
self._log = get_logger("c13_fdr.record_kind_policy")
@property
def forbidden_kinds(self) -> frozenset[str]:
return self._forbidden_kinds
def enforce_or_raise(self, record: FdrRecord) -> None:
"""Producer-side synchronous gate.
Raises ``RawFrameWriteForbiddenError`` if ``record.kind`` is in
the configured forbidden set; returns silently otherwise.
"""
if record.kind in self._forbidden_kinds:
raise RawFrameWriteForbiddenError(
f"FdrRecord kind={record.kind!r} from producer {record.producer_id!r} "
f"is forbidden by RecordKindPolicy"
)
def gate_for_writer(self, record: FdrRecord) -> GateDecision:
"""Writer-side rate-cap gate for ``failed_tile_thumbnail`` records.
Returns :attr:`GateDecision.ENQUEUE` for non-thumbnail records
and for the first thumbnail in each window. Returns
:attr:`GateDecision.DROP` for over-cap thumbnails; the drop is
recorded into the rate cap's accumulator so a single coalesced
overrun record is emitted via :meth:`drain_pending_overrun`.
"""
if record.kind != _THUMBNAIL_KIND:
return GateDecision.ENQUEUE
producer_id = record.producer_id or OVERRUN_PRODUCER_ID
if self._rate_cap.admit(producer_id):
return GateDecision.ENQUEUE
self._maybe_warn(producer_id)
return GateDecision.DROP
def drain_pending_overrun(self) -> FdrRecord | None:
"""Return a coalesced overrun record for any thumbnails dropped
since the previous drain, or ``None`` if the window is empty.
The writer-thread calls this at end-of-batch so over-cap drops
surface as a canonical overrun trail in the FDR.
"""
dropped, producer = self._rate_cap.drain_dropped()
if dropped <= 0:
return None
return FdrRecord(
schema_version=1,
ts=datetime.now(tz=timezone.utc).isoformat(),
producer_id=OVERRUN_PRODUCER_ID,
kind=OVERRUN_KIND,
payload={
"producer_id": producer or "shared.fdr_client",
"dropped_count": dropped,
},
)
def _maybe_warn(self, producer_id: str) -> None:
now = time.monotonic()
if now - self._last_warn_t < _LOG_RATE_LIMIT_S:
return
self._last_warn_t = now
self._log.warning(
f"fdr.thumbnail_rate_cap_exceeded: producer_id={producer_id}",
extra={
"kind": "fdr.thumbnail_rate_cap_exceeded",
"kv": {"producer_id": producer_id},
},
)
def make_record_kind_policy(config: RecordKindPolicyConfig) -> RecordKindPolicy:
"""Composition-root factory for :class:`RecordKindPolicy`."""
return RecordKindPolicy(config)
def is_legitimate_kind(kind: str, *, legitimate_kinds: Iterable[str]) -> bool:
"""Helper used by the AZ-272 contract test: a forbidden-kind set
must NOT contain any kind from the legitimate v1.x closed enum.
"""
return kind in set(legitimate_kinds)
@@ -0,0 +1,230 @@
"""``MidFlightTileSnapshotSink`` — sidecar storage for F4 tile snapshots (AZ-294).
C6 / C11 producers call :py:meth:`MidFlightTileSnapshotSink.write_snapshot`
with the orthorectified JPEG bytes. The sink:
1. Validates JPEG size (``jpeg_max_bytes``) and ``tile_id`` regex.
2. Writes the JPEG to ``flight_root/<flight_id>/tiles/<tile_id>.jpg``
atomically (temp file + ``fsync`` + ``rename``).
3. Enqueues a single ``kind="mid_flight_tile_snapshot"`` FdrRecord
carrying the relative path + capture timestamp.
4. Enforces the per-flight tile cap (``tile_snapshot_cap_bytes``) by
dropping the oldest tile if the cumulative size exceeds the cap;
emits a ``kind="overrun"`` record per drop.
Thread-safe: many producer threads may call ``write_snapshot``
concurrently; an internal lock serialises the cap-check + drop +
record-enqueue sequence. The JPEG write itself is independent and
runs outside the lock so producers do not serialise on each other's
disk IO.
"""
from __future__ import annotations
import os
import re
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Final
from uuid import UUID
from gps_denied_onboard.components.c13_fdr.errors import (
TileSnapshotInvalidIdError,
TileSnapshotTooLargeError,
)
from gps_denied_onboard.config import TileSnapshotConfig
from gps_denied_onboard.fdr_client.client import FdrClient
from gps_denied_onboard.fdr_client.records import (
OVERRUN_KIND,
OVERRUN_PRODUCER_ID,
FdrRecord,
)
from gps_denied_onboard.logging import get_logger
__all__ = ["MidFlightTileSnapshotSink"]
_TILE_ID_RE: Final[re.Pattern[str]] = re.compile(r"^[a-zA-Z0-9_-]{1,128}$")
_SNAPSHOT_KIND: Final[str] = "mid_flight_tile_snapshot"
_TILES_SUBDIR: Final[str] = "tiles"
def _iso(captured_at: datetime) -> str:
if captured_at.tzinfo is None:
captured_at = captured_at.replace(tzinfo=timezone.utc)
return captured_at.astimezone(timezone.utc).isoformat()
def _on_disk_size(path: Path) -> int:
try:
return path.stat().st_size
except OSError:
return 0
class MidFlightTileSnapshotSink:
"""Sidecar writer for F4 mid-flight tile snapshots."""
def __init__(
self,
flight_root: Path,
flight_id: UUID,
fdr_client: FdrClient,
config: TileSnapshotConfig,
) -> None:
self._flight_root = Path(flight_root)
self._flight_id = flight_id
self._fdr_client = fdr_client
self._config = config
self._flight_dir = self._flight_root / str(flight_id)
self._tiles_dir = self._flight_dir / _TILES_SUBDIR
self._lock = threading.Lock()
self._log = get_logger("c13_fdr.tile_snapshot_sink")
# In-memory cache of (captured_at_iso, tile_id, path) sorted by
# captured_at ASC. Refreshed lazily from disk on cap-check entry
# so an externally-deleted tile does not corrupt accounting
# (matches AZ-293's stale-list refresh pattern).
self._tile_index: list[tuple[str, str, Path]] = []
self._tile_index_initialised = False
@property
def tiles_dir(self) -> Path:
return self._tiles_dir
def write_snapshot(
self,
tile_id: str,
jpeg_bytes: bytes,
captured_at: datetime,
frame_id: int | None = None,
) -> Path:
"""Persist ``jpeg_bytes`` to the canonical sidecar path and emit a pointer record.
Returns the absolute path of the on-disk sidecar file.
"""
if not isinstance(jpeg_bytes, (bytes, bytearray)):
raise TypeError(f"jpeg_bytes must be bytes; got {type(jpeg_bytes).__name__}")
if len(jpeg_bytes) > self._config.jpeg_max_bytes:
raise TileSnapshotTooLargeError(
f"JPEG size {len(jpeg_bytes)} bytes exceeds jpeg_max_bytes "
f"{self._config.jpeg_max_bytes}"
)
if not isinstance(tile_id, str) or not _TILE_ID_RE.match(tile_id):
raise TileSnapshotInvalidIdError(
f"tile_id {tile_id!r} does not match {_TILE_ID_RE.pattern!r}"
)
self._tiles_dir.mkdir(parents=True, exist_ok=True)
canonical_path = self._tiles_dir / f"{tile_id}.jpg"
# Atomic write: temp file + fsync + rename.
tmp_path = canonical_path.with_suffix(canonical_path.suffix + ".tmp")
with open(tmp_path, "wb") as fh:
fh.write(bytes(jpeg_bytes))
fh.flush()
os.fsync(fh.fileno())
os.replace(tmp_path, canonical_path)
captured_iso = _iso(captured_at)
payload: dict[str, object] = {
"snapshot_path": f"{_TILES_SUBDIR}/{tile_id}.jpg",
"captured_at": captured_iso,
}
if frame_id is not None:
payload["frame_id"] = int(frame_id)
record = FdrRecord(
schema_version=1,
ts=datetime.now(tz=timezone.utc).isoformat(),
producer_id=OVERRUN_PRODUCER_ID,
kind=_SNAPSHOT_KIND,
payload=payload,
)
self._fdr_client.enqueue(record)
# Cap check + drop. Lock covers both index refresh and the drop
# so concurrent writers cannot double-drop the same tile.
with self._lock:
self._refresh_index_if_needed()
self._tile_index.append((captured_iso, tile_id, canonical_path))
self._tile_index.sort(key=lambda entry: entry[0])
self._evict_until_under_cap()
self._log.info(
f"fdr.tile_snapshot_written: {tile_id} ({len(jpeg_bytes)} B)",
extra={
"kind": "fdr.tile_snapshot_written",
"kv": {"tile_id": tile_id, "size_bytes": len(jpeg_bytes)},
},
)
return canonical_path
def _refresh_index_if_needed(self) -> None:
if self._tile_index_initialised:
return
self._tile_index_initialised = True
if not self._tiles_dir.exists():
return
entries: list[tuple[str, str, Path]] = []
for entry in self._tiles_dir.iterdir():
if not entry.is_file() or entry.suffix != ".jpg":
continue
tile_id = entry.stem
if not _TILE_ID_RE.match(tile_id):
continue
# Use the file mtime as a proxy for captured_at when this is a
# pre-existing tile from a prior process (per AC-7). It is a
# monotonic-enough ordering for oldest-first eviction.
mtime_iso = datetime.fromtimestamp(entry.stat().st_mtime, tz=timezone.utc).isoformat()
entries.append((mtime_iso, tile_id, entry))
entries.sort(key=lambda kv: kv[0])
self._tile_index = entries
def _evict_until_under_cap(self) -> None:
cap = self._config.tile_snapshot_cap_bytes
total = self._directory_size()
while total > cap and self._tile_index:
_captured_iso, tile_id, path = self._tile_index.pop(0)
freed = _on_disk_size(path)
try:
path.unlink()
except OSError as exc:
self._log.warning(
f"fdr.tile_snapshot_unlink_failed: {path.name} ({exc})",
extra={
"kind": "fdr.tile_snapshot_unlink_failed",
"kv": {"tile_id": tile_id, "error": repr(exc)},
},
)
total -= freed
continue
self._emit_overrun(tile_id=tile_id)
total = self._directory_size()
self._log.warning(
f"fdr.tile_snapshot_dropped: {tile_id} (freed {freed} B; total {total} B)",
extra={
"kind": "fdr.tile_snapshot_dropped",
"kv": {
"tile_id": tile_id,
"size_bytes_freed": freed,
"cap_bytes_after": total,
},
},
)
def _directory_size(self) -> int:
return sum(_on_disk_size(p) for _ts, _tid, p in self._tile_index)
def _emit_overrun(self, tile_id: str) -> None:
# ``producer_id`` payload field per the contract carries the
# ORIGINATING producer slug; the cap-driven drop is sink-side
# so we report the sink's slug. Outer envelope is always
# OVERRUN_PRODUCER_ID per AZ-272.
record = FdrRecord(
schema_version=1,
ts=datetime.now(tz=timezone.utc).isoformat(),
producer_id=OVERRUN_PRODUCER_ID,
kind=OVERRUN_KIND,
payload={
"producer_id": "shared.tile_snapshot_sink",
"dropped_count": 1,
},
)
self._fdr_client.enqueue(record)
@@ -39,6 +39,10 @@ from gps_denied_onboard.components.c13_fdr.errors import (
FdrWriterError,
)
from gps_denied_onboard.components.c13_fdr.headers import FlightFooter, FlightHeader
from gps_denied_onboard.components.c13_fdr.record_kind_policy import (
GateDecision,
RecordKindPolicy,
)
from gps_denied_onboard.config import FdrWriterConfig
from gps_denied_onboard.fdr_client.client import FdrClient
from gps_denied_onboard.fdr_client.records import (
@@ -91,6 +95,7 @@ class FileFdrWriter:
gcs_alert: Callable[[str], None],
*,
on_rotation: Callable[[FileFdrWriter, int], None] | None = None,
record_kind_policy: RecordKindPolicy | None = None,
drain_sleep_s: float = _DEFAULT_DRAIN_SLEEP_S,
) -> None:
self._flight_root = Path(flight_root)
@@ -99,6 +104,7 @@ class FileFdrWriter:
self._fdr_clients = tuple(fdr_clients)
self._gcs_alert = gcs_alert
self._on_rotation = on_rotation
self._record_kind_policy = record_kind_policy
self._drain_sleep_s = drain_sleep_s
# Filesystem state.
@@ -383,6 +389,10 @@ class FileFdrWriter:
batch = client.drain(max_records=self._config.batch_size)
for record in batch:
self._observe_overrun_record(record)
if self._record_kind_policy is not None:
decision = self._record_kind_policy.gate_for_writer(record)
if decision is GateDecision.DROP:
continue
try:
self._append_record(record)
except OSError as exc:
@@ -390,8 +400,21 @@ class FileFdrWriter:
# Continue dequeuing producer buffers so they don't grow
# unboundedly even in degraded mode (AC-5 part d).
continue
self._emit_pending_policy_overrun()
return len(batch)
def _emit_pending_policy_overrun(self) -> None:
if self._record_kind_policy is None:
return
overrun = self._record_kind_policy.drain_pending_overrun()
if overrun is None:
return
self._observe_overrun_record(overrun)
try:
self._append_record(overrun)
except OSError as exc:
self._handle_write_failure(exc)
def _observe_overrun_record(self, record: FdrRecord) -> None:
if record.kind != OVERRUN_KIND:
return