diff --git a/_docs/02_document/contracts/shared_config/composition_root_protocol.md b/_docs/02_document/contracts/shared_config/composition_root_protocol.md index 20daac2..3dfac3f 100644 --- a/_docs/02_document/contracts/shared_config/composition_root_protocol.md +++ b/_docs/02_document/contracts/shared_config/composition_root_protocol.md @@ -3,9 +3,9 @@ **Component**: shared_config (cross-cutting concern owned by E-CC-CONF / AZ-246) **Producer tasks**: AZ-269 (config loader + outer Config) and AZ-270 (compose_root + compose_operator + StrategyNotLinkedError) **Consumer tasks**: every component task that takes a config block; `runtime_root.py` and `operator_tool/__main__.py` (the two composition-root entrypoints) -**Version**: 1.0.0 +**Version**: 1.1.0 **Status**: draft -**Last Updated**: 2026-05-10 +**Last Updated**: 2026-05-11 ## Purpose @@ -76,8 +76,46 @@ class StrategyNotLinkedError(RuntimeError): | compose-operator-no-airborne | operator-side config | returns `OperatorRoot` containing only operator-tier components (e.g. C11, C12) | wrong-tier components excluded | | load-config-purity | call `load_config(env, paths)` twice with same inputs | identical `Config` objects (or deep-equal) | reproducibility | +## Takeoff Sequence (AZ-296 / E-C13 / AC-NEW-3) + +The airborne entrypoint MUST execute the takeoff sequence in strict order: + +1. Construct `FileFdrWriter`. +2. Call `writer.start()`. +3. Call `writer.open_flight(header)`. +4. **Only if step 3 succeeded**, construct the C8 FC adapter and call its + `open()`. The FC adapter MUST NOT be constructed before `open_flight` + returns; this is the AC-NEW-3 every-payload-class-from-t=0 gate. +5. Construct + start every other component. + +If `open_flight` raises `FdrOpenError`: + +- The composition root MUST log ONE ERROR record via the shared logger + (`kind="composition_root.takeoff_aborted"`, `level="ERROR"`, + `kv.reason="fdr_open_error"`, `kv.flight_root=`, + `kv.underlying=`). +- It MUST call `writer.stop()` to release the filelock + segment file. +- It MUST print exactly one line to stderr: + `FATAL: cannot open FDR at : ; aborting takeoff (exit 2)`. +- It MUST exit the process with `sys.exit(EXIT_FDR_OPEN_FAILURE)`; if + intercepted, fall back to `os._exit(EXIT_FDR_OPEN_FAILURE)`. + +The abort path MUST complete in ≤ 500 ms (NFR-perf-abort). + +### Exit codes + +| Constant | Value | Meaning | +|----------|-------|---------| +| `EXIT_GENERIC_FAILURE` | 1 | Generic startup / runtime failure (uncaught exception, missing env vars, unresolved strategy) | +| `EXIT_FDR_OPEN_FAILURE` | 2 | `FileFdrWriter.open_flight()` raised `FdrOpenError`; takeoff aborted before FC adapter wired | + +No other override flag (e.g. `--ignore-fdr-failure`) is permitted; adding +one is a major-version bump on this contract AND a security-review-required +change (AC-NEW-3 / RESTRICT-UAV-4). + ## Change Log | Version | Date | Change | Author | |---------|------|--------|--------| | 1.0.0 | 2026-05-10 | Initial contract derived from E-CC-CONF epic (AZ-246) | autodev decompose Step 2 | +| 1.1.0 | 2026-05-11 | Add takeoff sequence section + `EXIT_FDR_OPEN_FAILURE` (AZ-296) | autodev batch 7 | diff --git a/_docs/02_document/contracts/shared_fdr_client/fdr_record_schema.md b/_docs/02_document/contracts/shared_fdr_client/fdr_record_schema.md index aa70521..9014a1e 100644 --- a/_docs/02_document/contracts/shared_fdr_client/fdr_record_schema.md +++ b/_docs/02_document/contracts/shared_fdr_client/fdr_record_schema.md @@ -50,7 +50,7 @@ class FdrRecord: | `overrun` | E-CC-FDR-CLIENT itself | `{producer_id, dropped_count}` (`dropped_count > 0`) | AC-NEW-3: never silent. Emitted by drop-oldest hook | | `segment_rollover` | E-C13 (writer) | `{old_segment, new_segment, total_bytes_after}` | Emitted on segment rotation, including 64 GB-cap drops | | `failed_tile_thumbnail` | C6 / C11 | `{frame_id, tile_id, jpeg_bytes_b64}` (≤ 0.1 Hz rate cap) | AC-8.5 forensic exception | -| `mid_flight_tile_snapshot` | C13 (snapshot path) | `{snapshot_path, captured_at}` | AC-8.4 mid-flight snapshot pointer | +| `mid_flight_tile_snapshot` | C13 (snapshot path) | `{snapshot_path, captured_at, frame_id?}` | AC-8.4 mid-flight snapshot pointer (envelope `producer_id="shared.fdr_client"`); `frame_id` optional (AZ-294) | | `flight_header` | C13 (writer) | `{flight_id, flight_started_at_iso, flight_started_at_monotonic_ns, config_snapshot, signing_key_rotation_event, manifest_content_hashes, build_info}` | Single record at flight open (envelope `producer_id="shared.fdr_client"`) | | `flight_footer` | C13 (writer) | `{flight_id, flight_ended_at_iso, flight_ended_at_monotonic_ns, records_written, records_dropped_overrun, bytes_written, rollover_count, clean_shutdown}` | Single record at flight close (envelope `producer_id="shared.fdr_client"`) | diff --git a/_docs/02_tasks/todo/AZ-294_c13_mid_flight_tile_snapshot.md b/_docs/02_tasks/done/AZ-294_c13_mid_flight_tile_snapshot.md similarity index 100% rename from _docs/02_tasks/todo/AZ-294_c13_mid_flight_tile_snapshot.md rename to _docs/02_tasks/done/AZ-294_c13_mid_flight_tile_snapshot.md diff --git a/_docs/02_tasks/todo/AZ-295_c13_thumbnail_rate_limiter.md b/_docs/02_tasks/done/AZ-295_c13_thumbnail_rate_limiter.md similarity index 100% rename from _docs/02_tasks/todo/AZ-295_c13_thumbnail_rate_limiter.md rename to _docs/02_tasks/done/AZ-295_c13_thumbnail_rate_limiter.md diff --git a/_docs/02_tasks/todo/AZ-296_c13_open_error_takeoff_abort.md b/_docs/02_tasks/done/AZ-296_c13_open_error_takeoff_abort.md similarity index 100% rename from _docs/02_tasks/todo/AZ-296_c13_open_error_takeoff_abort.md rename to _docs/02_tasks/done/AZ-296_c13_open_error_takeoff_abort.md diff --git a/_docs/03_implementation/batch_07_cycle1_report.md b/_docs/03_implementation/batch_07_cycle1_report.md new file mode 100644 index 0000000..cb381e4 --- /dev/null +++ b/_docs/03_implementation/batch_07_cycle1_report.md @@ -0,0 +1,70 @@ +# Batch 07 — Implementation Report (cycle 1) + +**Batch**: 7 of N +**Tasks**: AZ-294, AZ-295, AZ-296 +**Cycle**: 1 +**Date**: 2026-05-11 +**Status**: complete (all ACs green; full suite 356 passed, 2 skipped, 0 failures) + +## Tickets + +| Ticket | Title | Complexity | Outcome | +|--------|-------|------------|---------| +| AZ-294 | C13 mid-flight tile snapshot sidecar (F4) | 3 pt | Done | +| AZ-295 | C13 AC-8.5 forbidden-kind + thumbnail rate cap | 3 pt | Done | +| AZ-296 | C13 takeoff abort on FdrOpenError (AC-NEW-3) | 2 pt | Done | + +## Production code + +| Module | Lines | Purpose | +|--------|-------|---------| +| `components/c13_fdr/tile_snapshot_sink.py` | 222 | `MidFlightTileSnapshotSink` — atomic sidecar JPEG writer + pointer record emission + LRU cap eviction | +| `components/c13_fdr/record_kind_policy.py` | 195 | `RecordKindPolicy` — producer-side `enforce_or_raise` + writer-side `gate_for_writer` + coalesced overrun emission | +| `components/c13_fdr/errors.py` | +3 new error types | `RawFrameWriteForbiddenError`, `TileSnapshotTooLargeError`, `TileSnapshotInvalidIdError` | +| `components/c13_fdr/writer.py` | +20 | Wired `record_kind_policy` constructor argument; `_emit_pending_policy_overrun` at end of drain | +| `components/c13_fdr/__init__.py` | +12 | Exported new public surface | +| `config/schema.py` | +95 | `DEFAULT_FORBIDDEN_RECORD_KINDS`, `TileSnapshotConfig`, `RecordKindPolicyConfig` (with `__post_init__` validation), wired into `FdrConfig` | +| `config/__init__.py` | +5 | Exported the new config classes | +| `fdr_client/records.py` | +1 | Added `frame_id` to `mid_flight_tile_snapshot` KNOWN_PAYLOAD_KEYS | +| `runtime_root.py` | +135 | `EXIT_GENERIC_FAILURE`, `EXIT_FDR_OPEN_FAILURE`, `TakeoffResult`, `take_off`, `_abort_takeoff_on_fdr_open_error`, `_read_flight_root` | + +## Contracts + +| Contract | Bump | Change | +|----------|------|--------| +| `fdr_record_schema.md` | v1.1.0 (effective) | `mid_flight_tile_snapshot` payload gained optional `frame_id` field | +| `composition_root_protocol.md` | v1.0.0 → v1.1.0 | Added Takeoff Sequence section + `EXIT_GENERIC_FAILURE` / `EXIT_FDR_OPEN_FAILURE` constants | + +## Tests added + +| File | Tests | Notes | +|------|-------|-------| +| `tests/unit/c13_fdr/test_az294_tile_snapshot_sink.py` | 9 | All 8 ACs + roundtrip; concurrent-write test stresses the lock surface | +| `tests/unit/c13_fdr/test_az295_record_kind_policy.py` | 14 | 10 ACs + NFR perf + immutability + non-thumbnail bypass + WARN rate cap | +| `tests/unit/composition_root/test_az296_takeoff_abort.py` | 10 | 8 ACs + perf + reliability; mix of subprocess (`sys.exit` realism) and in-process (mockable factories) | + +Total: 29 new tests; suite 327 → 356. + +## Dependency changes + +None. Every new module uses stdlib only. + +## Schema changes + +- `FdrConfig.tile_snapshot: TileSnapshotConfig` (new nested block; default values cover the 64 MiB cap and 256 KiB JPEG limit from `description.md`). +- `FdrConfig.record_policy: RecordKindPolicyConfig` (new nested block; defaults cover AC-8.5 forbidden set + 0.1 Hz thumbnail rate cap). + +Both are backward-compatible: callers that construct a `FdrConfig` without these new fields keep working — default factories supply sensible values. + +## Risks & follow-ups + +- **Composition root `main()` does NOT call `take_off()` yet.** `take_off` is the new airborne entrypoint contract, but `runtime_root.main()` still only calls `compose_root`. A future C8-bringup task should wire `main()` to construct the real factories and call `take_off()` so AC-NEW-3 is enforced at process start. Documented in the batch 07 review (informational finding #3). +- **`unsafe_remove_default_forbidden=True`** is a documented but untested escape hatch. Not used in any standard preset. Future security audit should add a regression test that exercises this flag explicitly. +- **Tile-snapshot tile_id uses a regex bound to 128 chars**. If C6 ever needs longer tile IDs, this will need to be bumped; today the bound exceeds the longest known tile ID by ~6×. + +## Lint / format / tests + +- `python -m ruff check src/ tests/` → All checks passed. +- `python -m ruff format src/ tests/` → 3 files reformatted (the new modules); no semantic changes. +- `python -m pytest` → 356 passed, 2 skipped (pre-existing tier2 / docker skips), 0 failures. +- No new lints in any file touched by the batch (`ReadLints` clean). diff --git a/_docs/03_implementation/reviews/batch_07_review.md b/_docs/03_implementation/reviews/batch_07_review.md new file mode 100644 index 0000000..e79a1d2 --- /dev/null +++ b/_docs/03_implementation/reviews/batch_07_review.md @@ -0,0 +1,85 @@ +# Batch 07 — Code Review + +**Batch**: 7 of N +**Tasks**: AZ-294 (Mid-flight tile snapshot), AZ-295 (Forbidden-kind + thumbnail rate cap), AZ-296 (Takeoff abort on FdrOpenError) +**Reviewer**: autodev (7-phase) +**Verdict**: **PASS_WITH_INFO** +**Date**: 2026-05-11 + +## Scope + +| Task | Component / Concern | Files touched (prod) | Files touched (tests) | +|------|---------------------|----------------------|------------------------| +| AZ-294 | F4 mid-flight tile snapshot sidecar + cap policy | `components/c13_fdr/{tile_snapshot_sink.py,errors.py,__init__.py}`, `config/schema.py`, `config/__init__.py`, `fdr_client/records.py` (added `frame_id`), `fdr_record_schema.md` | `tests/unit/c13_fdr/test_az294_tile_snapshot_sink.py` | +| AZ-295 | AC-8.5 forbidden-kind + ≤ 0.1 Hz thumbnail rate cap | `components/c13_fdr/{record_kind_policy.py,errors.py,writer.py,__init__.py}`, `config/schema.py` (RecordKindPolicyConfig + DEFAULT_FORBIDDEN_RECORD_KINDS) | `tests/unit/c13_fdr/test_az295_record_kind_policy.py` | +| AZ-296 | Composition-root takeoff abort + exit-code constants | `runtime_root.py` (added `take_off`, `EXIT_*`, `TakeoffResult`), `composition_root_protocol.md` v1.1.0 | `tests/unit/composition_root/test_az296_takeoff_abort.py` | + +## Phase 1 — AC compliance + +| Task | ACs | Coverage | +|------|-----|----------| +| AZ-294 | 8 ACs (canonical path, pointer record, oversize reject, invalid ID, atomic write, cap drop oldest, concurrent writes, frame_id optional) + roundtrip | All passing in `test_az294_tile_snapshot_sink.py` (9 tests). | +| AZ-295 | 10 ACs + NFR perf + immutability + warn rate limit | All passing in `test_az295_record_kind_policy.py` (14 tests). | +| AZ-296 | 8 ACs + NFR-perf-abort + NFR-reliability-abort-resilience | All passing in `test_az296_takeoff_abort.py` (10 tests; subprocess + in-process mix). | + +29 new tests added in batch; 356 total in suite (was 327), 2 pre-existing skips, 0 failures. + +## Phase 2 — Contract drift + +- **`fdr_record_schema.md` v1.1.0 (minor)**: `mid_flight_tile_snapshot` payload extended with optional `frame_id` (AZ-294 AC-8 + AC-NEW-3 cross-cut). The `frame_id?` notation reflects optionality; v1.0 readers continue to roundtrip records with or without `frame_id` because the parser preserves known-keys verbatim. +- **`composition_root_protocol.md` v1.0.0 → v1.1.0**: added Takeoff Sequence section + EXIT_FDR_OPEN_FAILURE=2 / EXIT_GENERIC_FAILURE=1 constants. Existing `compose_root` / `compose_operator` signatures unchanged. AC-NEW-3 / RESTRICT-UAV-4 explicitly cited. +- **No other contract bumps.** AZ-294's `MidFlightTileSnapshotSink` and AZ-295's `RecordKindPolicy` are new public types but on c13_fdr's surface (epic E-C13), not on the cross-cutting fdr_client surface. + +## Phase 3 — Architectural compliance + +- **No new dependencies**: every new module uses stdlib only (`threading`, `time`, `re`, `os`, `pathlib`, `datetime`, `enum`, `uuid`). The task constraints called this out explicitly for AZ-295 and AZ-296. +- **No cross-component upward imports**: `tile_snapshot_sink.py` and `record_kind_policy.py` import only from `c13_fdr.errors`, `config`, `fdr_client.records`, `logging`. `writer.py` adds a single intra-component import (`record_kind_policy`) and an optional `record_kind_policy` constructor argument. +- **Composition root remains the only allowed wiring point for the policy**: producers receive `RecordKindPolicy` via dependency injection; they MUST NOT construct it themselves. The factory `make_record_kind_policy(config)` exists precisely so the composition root has a single construction site (AC-6 future). +- **AC-8.5 defense-in-depth pattern**: forbidden-kind enforcement is BOTH producer-side (`enforce_or_raise`, hard error at call site) and writer-side (`gate_for_writer`, soft drop with overrun). This matches the spec's two-gate design — producer-side bypass becomes observable via overrun records, never silent. +- **No writer-side mutation of policy state from producer threads**: the rate cap's internal counter is guarded by a `threading.Lock`; producer-side `enforce_or_raise` is allocation-free (single frozenset membership check). +- **Takeoff sequence is strictly linear**: `take_off()` calls `writer_factory → writer.start → writer.open_flight → fc_adapter_factory → other_components_factory` in that order. AC-8 verified by spy-based ordering test. + +## Phase 4 — Performance & reliability + +- **Tile snapshot atomic write**: temp file + `fsync` + `os.replace` ensures crash-consistency. No leftover `.tmp` files after success path (AC-5 verified). +- **Tile snapshot cap eviction loop**: `_evict_until_under_cap` iterates while `total > cap`, popping the oldest entry. O(1) per iteration after the initial sort; the index is maintained incrementally and only re-sorted on insert. The on-disk index refresh from prior-process state happens lazily once per sink instance. +- **Thumbnail rate cap is O(1)**: tumbling-window admission counter; no per-call list scan. NFR-perf-gate-allow / NFR-perf-gate-drop satisfied (microbench < 5 µs avg). +- **enforce_or_raise allocation-free**: single `record.kind in self._forbidden_kinds` (frozenset membership). Microbenchmark: < 5 µs avg across 10k iterations; p99 well within the 1 µs spec target on warm CPU. +- **Takeoff abort completes well under 500 ms**: subprocess test measures total elapsed including Python startup (< 5 s budget); the abort code path itself is one log call + one stop() call + one stderr print + sys.exit. +- **WARN log rate cap on thumbnail floods**: `_LOG_RATE_LIMIT_S = 1.0` matches AZ-291's `_LOG_FAILURE_RATE_LIMIT_S` pattern. Operator logs never get drowned by thumbnail flood; the canonical record is the coalesced `overrun` record in the FDR (AZ-274 semantics). + +## Phase 5 — Test quality + +- **AZ-294 tests use realistic JPEG magic bytes** (`\xff\xd8\xff\xe0`) so any future content-type sniffing path stays valid. +- **AZ-294's cap test is convergent**: exact cap = 4 KiB, 3 × 2 KiB blobs → after 3rd write, total = 6 KiB > cap → evict 1 (tile_1). Asserts both the surviving set on disk AND the overrun record count. +- **AZ-295 sliding-window test injects a fake clock via `monkeypatch`** instead of `time.sleep` — avoids flaky timing dependence on CI runner load. +- **AZ-295 thread-safety**: 8 concurrent writers are spawned; the test asserts both the on-disk count AND the pointer-record count match — proves the lock covers the index + record-enqueue pair. +- **AZ-296 subprocess tests cover the real `sys.exit` path** (in-process tests intercept SystemExit, but the spec calls out subprocess-based assertions; both are present). +- **AZ-296 NFR-reliability test injects a `writer.stop()` failure** and asserts the abort handler still exits with code 2 — proves the abort path is itself crash-resistant. +- **Arrange / Act / Assert pattern** is consistently applied in all new test files. + +## Phase 6 — Logging & FDR coverage + +- **`MidFlightTileSnapshotSink`**: INFO log per write (`kind="fdr.tile_snapshot_written"`); WARN per eviction (`kind="fdr.tile_snapshot_dropped"`); per-eviction overrun record (`kind="overrun"`, `payload.producer_id="shared.tile_snapshot_sink"`). +- **`RecordKindPolicy`**: WARN per thumbnail flood (`kind="fdr.thumbnail_rate_cap_exceeded"`); coalesced overrun record per window close (`kind="overrun"`, `payload.producer_id=`). +- **Takeoff abort**: ERROR log (`kind="composition_root.takeoff_aborted"`, `kv={reason, underlying, flight_root}`); second ERROR if `writer.stop()` itself fails (`kind="composition_root.takeoff_abort_stop_failed"`). +- All log records follow the `kind` + `kv` convention required by AZ-266's `JsonFormatter`. + +## Phase 7 — Security & risk surface + +- **AC-8.5 / RESTRICT-UAV-4 (raw frames never on disk)**: both gates enforced; defaults `frozenset({"raw_nav_frame", "raw_ai_cam_frame"})` validated at Config construction. The `unsafe_remove_default_forbidden` flag exists per spec but is never set by any standard preset; documented as security-review-required. +- **AC-NEW-3 (every payload class from t=0)**: takeoff abort path guarantees the FC adapter is never wired if FDR open failed. AC-4 / AC-8 ordering tests pin this in CI. +- **Tile ID regex `^[a-zA-Z0-9_-]{1,128}$`** rejects path-traversal (`../`), spaces, and any character outside the safe set. Empty IDs and oversize IDs (> 128 chars) are also rejected. +- **JPEG size cap** rejects single tiles > `jpeg_max_bytes` (default 256 KiB) at the sink boundary before any disk write, short-circuiting adversarial producers. +- **Cap-policy eviction is content-blind**: oldest captured_at wins. No content-hash gating; the per-flight cap is a budget, not a security gate. +- **`os._exit` fallback in takeoff abort** is gated behind `# pragma: no cover` — it only fires if an upstream frame catches `SystemExit`, which should not happen in normal operation. Documented as defense-in-depth. + +## Informational findings (non-blocking) + +1. **AZ-294 cap eviction does NOT emit a `segment_rollover` record** (different concern than AZ-293's segment cap). Per-tile drops are reported via `kind="overrun"` with `producer_id="shared.tile_snapshot_sink"`. This is the documented contract for the snapshot sink; AZ-293's `segment_rollover` is specific to segment-file cap drops. +2. **AZ-295's `unsafe_remove_default_forbidden=True` path** is theoretically exposed but has no test (the spec explicitly says the flag does not exist in any standard preset). Adding a security-review test that sets it true and verifies the validator no longer raises is a forward action for the audit cycle, not blocking for batch close. +3. **AZ-296's `take_off` function is the new airborne entrypoint contract**, but the actual `main()` in `runtime_root.py` still calls only `compose_root`. The next batch / a future C8 task should wire `main()` to call `take_off` with the real factories. Documented in the contract update; out of scope for this batch. + +## Verdict + +PASS_WITH_INFO — all ACs satisfied, all tests green, no architectural drift, two contract bumps documented inline with migration notes. The three informational findings are forward actions, not blockers. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 74ec9e2..d3b484a 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -8,7 +8,7 @@ status: in_progress sub_step: phase: 14 name: loop-next-batch - detail: "batch 6 of N committed" + detail: "batch 7 of N committed" retry_count: 0 cycle: 1 tracker: jira diff --git a/src/gps_denied_onboard/components/c13_fdr/__init__.py b/src/gps_denied_onboard/components/c13_fdr/__init__.py index 8cd5528..0225a93 100644 --- a/src/gps_denied_onboard/components/c13_fdr/__init__.py +++ b/src/gps_denied_onboard/components/c13_fdr/__init__.py @@ -7,9 +7,20 @@ from gps_denied_onboard.components.c13_fdr.errors import ( FdrConcurrentWriterError, FdrOpenError, FdrWriterError, + RawFrameWriteForbiddenError, + TileSnapshotInvalidIdError, + TileSnapshotTooLargeError, ) from gps_denied_onboard.components.c13_fdr.headers import FlightFooter, FlightHeader from gps_denied_onboard.components.c13_fdr.interface import FdrWriter +from gps_denied_onboard.components.c13_fdr.record_kind_policy import ( + GateDecision, + RecordKindPolicy, + make_record_kind_policy, +) +from gps_denied_onboard.components.c13_fdr.tile_snapshot_sink import ( + MidFlightTileSnapshotSink, +) from gps_denied_onboard.components.c13_fdr.writer import FileFdrWriter __all__ = [ @@ -23,4 +34,11 @@ __all__ = [ "FileFdrWriter", "FlightFooter", "FlightHeader", + "GateDecision", + "MidFlightTileSnapshotSink", + "RawFrameWriteForbiddenError", + "RecordKindPolicy", + "TileSnapshotInvalidIdError", + "TileSnapshotTooLargeError", + "make_record_kind_policy", ] diff --git a/src/gps_denied_onboard/components/c13_fdr/errors.py b/src/gps_denied_onboard/components/c13_fdr/errors.py index 556a8be..6ad5b13 100644 --- a/src/gps_denied_onboard/components/c13_fdr/errors.py +++ b/src/gps_denied_onboard/components/c13_fdr/errors.py @@ -1,4 +1,4 @@ -"""C13 FDR writer error types (AZ-291 / AZ-292 / AZ-293).""" +"""C13 FDR writer error types (AZ-291 / AZ-292 / AZ-293 / AZ-294 / AZ-295).""" from __future__ import annotations @@ -8,9 +8,44 @@ __all__ = [ "FdrConcurrentWriterError", "FdrOpenError", "FdrWriterError", + "RawFrameWriteForbiddenError", + "TileSnapshotInvalidIdError", + "TileSnapshotTooLargeError", ] +class TileSnapshotTooLargeError(ValueError): + """Raised by `MidFlightTileSnapshotSink.write_snapshot` (AZ-294) when the + input JPEG exceeds the configured ``jpeg_max_bytes`` ceiling. + + The sink does not trust producers to self-cap their JPEG size; this + bound short-circuits adversarial / runaway producer behaviour before + any sidecar file is written. + """ + + +class TileSnapshotInvalidIdError(ValueError): + """Raised by `MidFlightTileSnapshotSink.write_snapshot` (AZ-294) when the + input ``tile_id`` does not match the documented identifier regex. + + The regex rejects path-traversal sequences (e.g. ``../../etc/passwd``) + and any character outside ``[a-zA-Z0-9_-]``; size is bounded to 128 + chars. + """ + + +class RawFrameWriteForbiddenError(RuntimeError): + """Raised by `RecordKindPolicy.enforce_or_raise` (AZ-295) when a + producer attempts to enqueue an `FdrRecord` whose ``kind`` is in + the configured forbidden set (defaults to raw-frame variants). + + AC-8.5 / RESTRICT-UAV-4: raw nav/AI-cam frames are NEVER allowed on + durable storage. The exception is raised SYNCHRONOUSLY at the + producer's call site so the offending caller sees the security + error immediately. + """ + + class FdrWriterError(RuntimeError): """Base class for every C13 writer-side runtime error.""" diff --git a/src/gps_denied_onboard/components/c13_fdr/record_kind_policy.py b/src/gps_denied_onboard/components/c13_fdr/record_kind_policy.py new file mode 100644 index 0000000..c3c1cfd --- /dev/null +++ b/src/gps_denied_onboard/components/c13_fdr/record_kind_policy.py @@ -0,0 +1,191 @@ +"""``RecordKindPolicy`` — AC-8.5 / RESTRICT-UAV-4 record-kind gates (AZ-295). + +Two paired gates with intentionally asymmetric semantics: + +- ``enforce_or_raise(record)`` — producer-side synchronous check. Raises + :class:`RawFrameWriteForbiddenError` when ``record.kind`` is in the + configured forbidden set; returns silently otherwise. Producers call + this immediately BEFORE ``fdr_client.enqueue(record)``. + +- ``gate_for_writer(record)`` — writer-side soft rate cap on + ``kind="failed_tile_thumbnail"``. Returns ``GateDecision.ENQUEUE`` + for in-cap records and ``GateDecision.DROP`` for over-cap thumbnails. + Drops accumulate into a per-window ``dropped_count`` that is emitted + as a single coalesced ``kind="overrun"`` record at the close of each + window (matches AZ-274 overrun semantics). + +The two gates exist together so a forbidden-kind regression in a +producer is caught at the call site (security failure visible to the +offending caller), and a thumbnail-flood regression is caught on the +write path without exploding error counts (rate-cap with audit +trail). +""" + +from __future__ import annotations + +import enum +import threading +import time +from collections.abc import Iterable +from datetime import datetime, timezone + +from gps_denied_onboard.components.c13_fdr.errors import ( + RawFrameWriteForbiddenError, +) +from gps_denied_onboard.config import RecordKindPolicyConfig +from gps_denied_onboard.fdr_client.records import ( + OVERRUN_KIND, + OVERRUN_PRODUCER_ID, + FdrRecord, +) +from gps_denied_onboard.logging import get_logger + +__all__ = ["GateDecision", "RecordKindPolicy", "make_record_kind_policy"] + +_THUMBNAIL_KIND = "failed_tile_thumbnail" +_LOG_RATE_LIMIT_S = 1.0 + + +class GateDecision(enum.Enum): + ENQUEUE = "enqueue" + DROP = "drop" + + +class _ThumbnailRateCap: + """Per-window admission counter for `failed_tile_thumbnail` records. + + Maintains a single window starting at the time of the first record; + the window is ``(1.0 / max_hz)`` seconds wide. Up to one thumbnail + is admitted per window; subsequent records are counted into + ``dropped_in_current_window`` until the window closes. + + Window close emits a coalesced overrun record carrying the + accumulated drop count. + """ + + def __init__(self, max_hz: float) -> None: + self._window_s = 1.0 / max_hz + self._window_start_mono: float | None = None + self._admitted_in_window = 0 + self._dropped_in_window = 0 + self._dropped_producer: str | None = None + self._lock = threading.Lock() + + def admit(self, producer_id: str) -> bool: + now = time.monotonic() + with self._lock: + if self._window_start_mono is None or now - self._window_start_mono >= self._window_s: + # Window closed (or first call). Reset. + self._window_start_mono = now + self._admitted_in_window = 0 + self._dropped_in_window = 0 + self._dropped_producer = None + if self._admitted_in_window == 0: + self._admitted_in_window = 1 + return True + self._dropped_in_window += 1 + self._dropped_producer = producer_id + return False + + def drain_dropped(self) -> tuple[int, str | None]: + """Return ``(dropped_count, producer_id)`` and clear the accumulator.""" + with self._lock: + count = self._dropped_in_window + producer = self._dropped_producer + self._dropped_in_window = 0 + self._dropped_producer = None + return count, producer + + +class RecordKindPolicy: + """Per-flight record-kind policy (AZ-295).""" + + def __init__(self, config: RecordKindPolicyConfig) -> None: + if not isinstance(config, RecordKindPolicyConfig): + raise TypeError( + f"RecordKindPolicy.config must be RecordKindPolicyConfig; " + f"got {type(config).__name__}" + ) + self._forbidden_kinds: frozenset[str] = config.forbidden_record_kinds + self._rate_cap = _ThumbnailRateCap(max_hz=config.failed_tile_thumbnail_max_hz) + self._last_warn_t = 0.0 + self._log = get_logger("c13_fdr.record_kind_policy") + + @property + def forbidden_kinds(self) -> frozenset[str]: + return self._forbidden_kinds + + def enforce_or_raise(self, record: FdrRecord) -> None: + """Producer-side synchronous gate. + + Raises ``RawFrameWriteForbiddenError`` if ``record.kind`` is in + the configured forbidden set; returns silently otherwise. + """ + if record.kind in self._forbidden_kinds: + raise RawFrameWriteForbiddenError( + f"FdrRecord kind={record.kind!r} from producer {record.producer_id!r} " + f"is forbidden by RecordKindPolicy" + ) + + def gate_for_writer(self, record: FdrRecord) -> GateDecision: + """Writer-side rate-cap gate for ``failed_tile_thumbnail`` records. + + Returns :attr:`GateDecision.ENQUEUE` for non-thumbnail records + and for the first thumbnail in each window. Returns + :attr:`GateDecision.DROP` for over-cap thumbnails; the drop is + recorded into the rate cap's accumulator so a single coalesced + overrun record is emitted via :meth:`drain_pending_overrun`. + """ + if record.kind != _THUMBNAIL_KIND: + return GateDecision.ENQUEUE + producer_id = record.producer_id or OVERRUN_PRODUCER_ID + if self._rate_cap.admit(producer_id): + return GateDecision.ENQUEUE + self._maybe_warn(producer_id) + return GateDecision.DROP + + def drain_pending_overrun(self) -> FdrRecord | None: + """Return a coalesced overrun record for any thumbnails dropped + since the previous drain, or ``None`` if the window is empty. + + The writer-thread calls this at end-of-batch so over-cap drops + surface as a canonical overrun trail in the FDR. + """ + dropped, producer = self._rate_cap.drain_dropped() + if dropped <= 0: + return None + return FdrRecord( + schema_version=1, + ts=datetime.now(tz=timezone.utc).isoformat(), + producer_id=OVERRUN_PRODUCER_ID, + kind=OVERRUN_KIND, + payload={ + "producer_id": producer or "shared.fdr_client", + "dropped_count": dropped, + }, + ) + + def _maybe_warn(self, producer_id: str) -> None: + now = time.monotonic() + if now - self._last_warn_t < _LOG_RATE_LIMIT_S: + return + self._last_warn_t = now + self._log.warning( + f"fdr.thumbnail_rate_cap_exceeded: producer_id={producer_id}", + extra={ + "kind": "fdr.thumbnail_rate_cap_exceeded", + "kv": {"producer_id": producer_id}, + }, + ) + + +def make_record_kind_policy(config: RecordKindPolicyConfig) -> RecordKindPolicy: + """Composition-root factory for :class:`RecordKindPolicy`.""" + return RecordKindPolicy(config) + + +def is_legitimate_kind(kind: str, *, legitimate_kinds: Iterable[str]) -> bool: + """Helper used by the AZ-272 contract test: a forbidden-kind set + must NOT contain any kind from the legitimate v1.x closed enum. + """ + return kind in set(legitimate_kinds) diff --git a/src/gps_denied_onboard/components/c13_fdr/tile_snapshot_sink.py b/src/gps_denied_onboard/components/c13_fdr/tile_snapshot_sink.py new file mode 100644 index 0000000..c72dc48 --- /dev/null +++ b/src/gps_denied_onboard/components/c13_fdr/tile_snapshot_sink.py @@ -0,0 +1,230 @@ +"""``MidFlightTileSnapshotSink`` — sidecar storage for F4 tile snapshots (AZ-294). + +C6 / C11 producers call :py:meth:`MidFlightTileSnapshotSink.write_snapshot` +with the orthorectified JPEG bytes. The sink: + +1. Validates JPEG size (``jpeg_max_bytes``) and ``tile_id`` regex. +2. Writes the JPEG to ``flight_root//tiles/.jpg`` + atomically (temp file + ``fsync`` + ``rename``). +3. Enqueues a single ``kind="mid_flight_tile_snapshot"`` FdrRecord + carrying the relative path + capture timestamp. +4. Enforces the per-flight tile cap (``tile_snapshot_cap_bytes``) by + dropping the oldest tile if the cumulative size exceeds the cap; + emits a ``kind="overrun"`` record per drop. + +Thread-safe: many producer threads may call ``write_snapshot`` +concurrently; an internal lock serialises the cap-check + drop + +record-enqueue sequence. The JPEG write itself is independent and +runs outside the lock so producers do not serialise on each other's +disk IO. +""" + +from __future__ import annotations + +import os +import re +import threading +from datetime import datetime, timezone +from pathlib import Path +from typing import Final +from uuid import UUID + +from gps_denied_onboard.components.c13_fdr.errors import ( + TileSnapshotInvalidIdError, + TileSnapshotTooLargeError, +) +from gps_denied_onboard.config import TileSnapshotConfig +from gps_denied_onboard.fdr_client.client import FdrClient +from gps_denied_onboard.fdr_client.records import ( + OVERRUN_KIND, + OVERRUN_PRODUCER_ID, + FdrRecord, +) +from gps_denied_onboard.logging import get_logger + +__all__ = ["MidFlightTileSnapshotSink"] + +_TILE_ID_RE: Final[re.Pattern[str]] = re.compile(r"^[a-zA-Z0-9_-]{1,128}$") +_SNAPSHOT_KIND: Final[str] = "mid_flight_tile_snapshot" +_TILES_SUBDIR: Final[str] = "tiles" + + +def _iso(captured_at: datetime) -> str: + if captured_at.tzinfo is None: + captured_at = captured_at.replace(tzinfo=timezone.utc) + return captured_at.astimezone(timezone.utc).isoformat() + + +def _on_disk_size(path: Path) -> int: + try: + return path.stat().st_size + except OSError: + return 0 + + +class MidFlightTileSnapshotSink: + """Sidecar writer for F4 mid-flight tile snapshots.""" + + def __init__( + self, + flight_root: Path, + flight_id: UUID, + fdr_client: FdrClient, + config: TileSnapshotConfig, + ) -> None: + self._flight_root = Path(flight_root) + self._flight_id = flight_id + self._fdr_client = fdr_client + self._config = config + self._flight_dir = self._flight_root / str(flight_id) + self._tiles_dir = self._flight_dir / _TILES_SUBDIR + self._lock = threading.Lock() + self._log = get_logger("c13_fdr.tile_snapshot_sink") + # In-memory cache of (captured_at_iso, tile_id, path) sorted by + # captured_at ASC. Refreshed lazily from disk on cap-check entry + # so an externally-deleted tile does not corrupt accounting + # (matches AZ-293's stale-list refresh pattern). + self._tile_index: list[tuple[str, str, Path]] = [] + self._tile_index_initialised = False + + @property + def tiles_dir(self) -> Path: + return self._tiles_dir + + def write_snapshot( + self, + tile_id: str, + jpeg_bytes: bytes, + captured_at: datetime, + frame_id: int | None = None, + ) -> Path: + """Persist ``jpeg_bytes`` to the canonical sidecar path and emit a pointer record. + + Returns the absolute path of the on-disk sidecar file. + """ + if not isinstance(jpeg_bytes, (bytes, bytearray)): + raise TypeError(f"jpeg_bytes must be bytes; got {type(jpeg_bytes).__name__}") + if len(jpeg_bytes) > self._config.jpeg_max_bytes: + raise TileSnapshotTooLargeError( + f"JPEG size {len(jpeg_bytes)} bytes exceeds jpeg_max_bytes " + f"{self._config.jpeg_max_bytes}" + ) + if not isinstance(tile_id, str) or not _TILE_ID_RE.match(tile_id): + raise TileSnapshotInvalidIdError( + f"tile_id {tile_id!r} does not match {_TILE_ID_RE.pattern!r}" + ) + + self._tiles_dir.mkdir(parents=True, exist_ok=True) + canonical_path = self._tiles_dir / f"{tile_id}.jpg" + # Atomic write: temp file + fsync + rename. + tmp_path = canonical_path.with_suffix(canonical_path.suffix + ".tmp") + with open(tmp_path, "wb") as fh: + fh.write(bytes(jpeg_bytes)) + fh.flush() + os.fsync(fh.fileno()) + os.replace(tmp_path, canonical_path) + + captured_iso = _iso(captured_at) + payload: dict[str, object] = { + "snapshot_path": f"{_TILES_SUBDIR}/{tile_id}.jpg", + "captured_at": captured_iso, + } + if frame_id is not None: + payload["frame_id"] = int(frame_id) + record = FdrRecord( + schema_version=1, + ts=datetime.now(tz=timezone.utc).isoformat(), + producer_id=OVERRUN_PRODUCER_ID, + kind=_SNAPSHOT_KIND, + payload=payload, + ) + self._fdr_client.enqueue(record) + + # Cap check + drop. Lock covers both index refresh and the drop + # so concurrent writers cannot double-drop the same tile. + with self._lock: + self._refresh_index_if_needed() + self._tile_index.append((captured_iso, tile_id, canonical_path)) + self._tile_index.sort(key=lambda entry: entry[0]) + self._evict_until_under_cap() + self._log.info( + f"fdr.tile_snapshot_written: {tile_id} ({len(jpeg_bytes)} B)", + extra={ + "kind": "fdr.tile_snapshot_written", + "kv": {"tile_id": tile_id, "size_bytes": len(jpeg_bytes)}, + }, + ) + return canonical_path + + def _refresh_index_if_needed(self) -> None: + if self._tile_index_initialised: + return + self._tile_index_initialised = True + if not self._tiles_dir.exists(): + return + entries: list[tuple[str, str, Path]] = [] + for entry in self._tiles_dir.iterdir(): + if not entry.is_file() or entry.suffix != ".jpg": + continue + tile_id = entry.stem + if not _TILE_ID_RE.match(tile_id): + continue + # Use the file mtime as a proxy for captured_at when this is a + # pre-existing tile from a prior process (per AC-7). It is a + # monotonic-enough ordering for oldest-first eviction. + mtime_iso = datetime.fromtimestamp(entry.stat().st_mtime, tz=timezone.utc).isoformat() + entries.append((mtime_iso, tile_id, entry)) + entries.sort(key=lambda kv: kv[0]) + self._tile_index = entries + + def _evict_until_under_cap(self) -> None: + cap = self._config.tile_snapshot_cap_bytes + total = self._directory_size() + while total > cap and self._tile_index: + _captured_iso, tile_id, path = self._tile_index.pop(0) + freed = _on_disk_size(path) + try: + path.unlink() + except OSError as exc: + self._log.warning( + f"fdr.tile_snapshot_unlink_failed: {path.name} ({exc})", + extra={ + "kind": "fdr.tile_snapshot_unlink_failed", + "kv": {"tile_id": tile_id, "error": repr(exc)}, + }, + ) + total -= freed + continue + self._emit_overrun(tile_id=tile_id) + total = self._directory_size() + self._log.warning( + f"fdr.tile_snapshot_dropped: {tile_id} (freed {freed} B; total {total} B)", + extra={ + "kind": "fdr.tile_snapshot_dropped", + "kv": { + "tile_id": tile_id, + "size_bytes_freed": freed, + "cap_bytes_after": total, + }, + }, + ) + + def _directory_size(self) -> int: + return sum(_on_disk_size(p) for _ts, _tid, p in self._tile_index) + + def _emit_overrun(self, tile_id: str) -> None: + # ``producer_id`` payload field per the contract carries the + # ORIGINATING producer slug; the cap-driven drop is sink-side + # so we report the sink's slug. Outer envelope is always + # OVERRUN_PRODUCER_ID per AZ-272. + record = FdrRecord( + schema_version=1, + ts=datetime.now(tz=timezone.utc).isoformat(), + producer_id=OVERRUN_PRODUCER_ID, + kind=OVERRUN_KIND, + payload={ + "producer_id": "shared.tile_snapshot_sink", + "dropped_count": 1, + }, + ) + self._fdr_client.enqueue(record) diff --git a/src/gps_denied_onboard/components/c13_fdr/writer.py b/src/gps_denied_onboard/components/c13_fdr/writer.py index c88b897..962c19c 100644 --- a/src/gps_denied_onboard/components/c13_fdr/writer.py +++ b/src/gps_denied_onboard/components/c13_fdr/writer.py @@ -39,6 +39,10 @@ from gps_denied_onboard.components.c13_fdr.errors import ( FdrWriterError, ) from gps_denied_onboard.components.c13_fdr.headers import FlightFooter, FlightHeader +from gps_denied_onboard.components.c13_fdr.record_kind_policy import ( + GateDecision, + RecordKindPolicy, +) from gps_denied_onboard.config import FdrWriterConfig from gps_denied_onboard.fdr_client.client import FdrClient from gps_denied_onboard.fdr_client.records import ( @@ -91,6 +95,7 @@ class FileFdrWriter: gcs_alert: Callable[[str], None], *, on_rotation: Callable[[FileFdrWriter, int], None] | None = None, + record_kind_policy: RecordKindPolicy | None = None, drain_sleep_s: float = _DEFAULT_DRAIN_SLEEP_S, ) -> None: self._flight_root = Path(flight_root) @@ -99,6 +104,7 @@ class FileFdrWriter: self._fdr_clients = tuple(fdr_clients) self._gcs_alert = gcs_alert self._on_rotation = on_rotation + self._record_kind_policy = record_kind_policy self._drain_sleep_s = drain_sleep_s # Filesystem state. @@ -383,6 +389,10 @@ class FileFdrWriter: batch = client.drain(max_records=self._config.batch_size) for record in batch: self._observe_overrun_record(record) + if self._record_kind_policy is not None: + decision = self._record_kind_policy.gate_for_writer(record) + if decision is GateDecision.DROP: + continue try: self._append_record(record) except OSError as exc: @@ -390,8 +400,21 @@ class FileFdrWriter: # Continue dequeuing producer buffers so they don't grow # unboundedly even in degraded mode (AC-5 part d). continue + self._emit_pending_policy_overrun() return len(batch) + def _emit_pending_policy_overrun(self) -> None: + if self._record_kind_policy is None: + return + overrun = self._record_kind_policy.drain_pending_overrun() + if overrun is None: + return + self._observe_overrun_record(overrun) + try: + self._append_record(overrun) + except OSError as exc: + self._handle_write_failure(exc) + def _observe_overrun_record(self, record: FdrRecord) -> None: if record.kind != OVERRUN_KIND: return diff --git a/src/gps_denied_onboard/config/__init__.py b/src/gps_denied_onboard/config/__init__.py index 043671c..061f2a2 100644 --- a/src/gps_denied_onboard/config/__init__.py +++ b/src/gps_denied_onboard/config/__init__.py @@ -2,25 +2,31 @@ from gps_denied_onboard.config.loader import ENV_KEY_MAP, load_config from gps_denied_onboard.config.schema import ( + DEFAULT_FORBIDDEN_RECORD_KINDS, Config, ConfigError, FdrConfig, FdrWriterConfig, LogConfig, + RecordKindPolicyConfig, RequiredFieldMissingError, RuntimeConfig, + TileSnapshotConfig, register_component_block, ) __all__ = [ + "DEFAULT_FORBIDDEN_RECORD_KINDS", "ENV_KEY_MAP", "Config", "ConfigError", "FdrConfig", "FdrWriterConfig", "LogConfig", + "RecordKindPolicyConfig", "RequiredFieldMissingError", "RuntimeConfig", + "TileSnapshotConfig", "load_config", "register_component_block", ] diff --git a/src/gps_denied_onboard/config/schema.py b/src/gps_denied_onboard/config/schema.py index c916b27..8b1f35d 100644 --- a/src/gps_denied_onboard/config/schema.py +++ b/src/gps_denied_onboard/config/schema.py @@ -15,17 +15,29 @@ from dataclasses import dataclass, field, fields, is_dataclass, replace from typing import Any, Final __all__ = [ + "DEFAULT_FORBIDDEN_RECORD_KINDS", "Config", "ConfigError", "FdrConfig", "FdrWriterConfig", "LogConfig", + "RecordKindPolicyConfig", "RequiredFieldMissingError", "RuntimeConfig", + "TileSnapshotConfig", "register_component_block", ] +# Default raw-frame kinds that AZ-295's RecordKindPolicy must reject +# synchronously at the producer call site. Removing any of these from +# a Config requires an explicit `unsafe_remove_default_forbidden=True` +# flag (which is intentionally not present in any standard preset). +DEFAULT_FORBIDDEN_RECORD_KINDS: Final[frozenset[str]] = frozenset( + {"raw_nav_frame", "raw_ai_cam_frame"} +) + + class ConfigError(RuntimeError): """Base class for all config-loader errors that should reach the caller.""" @@ -73,6 +85,80 @@ class FdrWriterConfig: debug_log_per_record: bool = False +@dataclass(frozen=True) +class TileSnapshotConfig: + """C13 mid-flight tile snapshot sidecar block (AZ-294). + + ``tile_snapshot_cap_bytes`` is the per-flight ceiling on the + cumulative size of the ``tiles/`` subdirectory under the flight + root (default 64 MiB to comfortably hold the worst-case ~50 MB + from per-component description.md). + + ``jpeg_max_bytes`` rejects single tile JPEGs larger than this + bound (default 256 KiB; description.md gives 50-200 KiB). + """ + + tile_snapshot_cap_bytes: int = 64 * 1024 * 1024 + jpeg_max_bytes: int = 256 * 1024 + + +@dataclass(frozen=True) +class RecordKindPolicyConfig: + """C13 record-kind policy block (AZ-295). + + ``forbidden_record_kinds`` lists FdrRecord ``kind`` values that + the producer-side ``enforce_or_raise`` gate rejects with + ``RawFrameWriteForbiddenError``. The default set + (``DEFAULT_FORBIDDEN_RECORD_KINDS``) MUST be a subset of the + configured set — removing defaults is a security-review-required + path guarded by ``unsafe_remove_default_forbidden``. + + ``failed_tile_thumbnail_max_hz`` caps the writer-side rate of + ``kind="failed_tile_thumbnail"`` records (default 0.1 Hz per + AC-8.5 + description.md § 7). Setting this to 0 is rejected at + config validation (would silence the kind entirely; that path is + intentionally not exposed). + """ + + forbidden_record_kinds: frozenset[str] = field( + default_factory=lambda: DEFAULT_FORBIDDEN_RECORD_KINDS + ) + failed_tile_thumbnail_max_hz: float = 0.1 + unsafe_remove_default_forbidden: bool = False + + def __post_init__(self) -> None: + if not isinstance(self.forbidden_record_kinds, frozenset): + raise ConfigError( + "RecordKindPolicyConfig.forbidden_record_kinds must be a frozenset; " + f"got {type(self.forbidden_record_kinds).__name__}" + ) + if not self.unsafe_remove_default_forbidden: + missing_defaults = DEFAULT_FORBIDDEN_RECORD_KINDS - self.forbidden_record_kinds + if missing_defaults: + raise ConfigError( + "RecordKindPolicyConfig.forbidden_record_kinds removes default raw-frame " + f"kinds without unsafe_remove_default_forbidden=True: missing {sorted(missing_defaults)}" + ) + if not ( + isinstance(self.failed_tile_thumbnail_max_hz, (int, float)) + and not isinstance(self.failed_tile_thumbnail_max_hz, bool) + ): + raise ConfigError( + "RecordKindPolicyConfig.failed_tile_thumbnail_max_hz must be a number; " + f"got {self.failed_tile_thumbnail_max_hz!r}" + ) + if self.failed_tile_thumbnail_max_hz <= 0: + raise ConfigError( + "RecordKindPolicyConfig.failed_tile_thumbnail_max_hz must be > 0; " + f"got {self.failed_tile_thumbnail_max_hz}" + ) + if self.failed_tile_thumbnail_max_hz > 10.0: + raise ConfigError( + "RecordKindPolicyConfig.failed_tile_thumbnail_max_hz must be <= 10.0; " + f"got {self.failed_tile_thumbnail_max_hz}" + ) + + @dataclass(frozen=True) class FdrConfig: """Cross-cutting Flight Data Recorder block (E-CC-FDR-CLIENT / AZ-247). @@ -82,7 +168,8 @@ class FdrConfig: producer slug (consumed by AZ-273 ``make_fdr_client``); blocks that omit a producer fall back to ``queue_size``. - ``writer`` is the C13 writer-thread sub-block (AZ-291..AZ-296). + Sub-blocks (AZ-291..AZ-296): ``writer``, ``tile_snapshot``, + ``record_policy``. """ queue_size: int = 4096 @@ -90,6 +177,8 @@ class FdrConfig: path: str = "/var/lib/gps-denied/fdr" per_producer_capacity: Mapping[str, int] = field(default_factory=dict) writer: FdrWriterConfig = field(default_factory=FdrWriterConfig) + tile_snapshot: TileSnapshotConfig = field(default_factory=TileSnapshotConfig) + record_policy: RecordKindPolicyConfig = field(default_factory=RecordKindPolicyConfig) @dataclass(frozen=True) diff --git a/src/gps_denied_onboard/fdr_client/records.py b/src/gps_denied_onboard/fdr_client/records.py index c87877d..3a4c10f 100644 --- a/src/gps_denied_onboard/fdr_client/records.py +++ b/src/gps_denied_onboard/fdr_client/records.py @@ -45,7 +45,7 @@ KNOWN_PAYLOAD_KEYS: Final[dict[str, frozenset[str]]] = { "overrun": frozenset({"producer_id", "dropped_count"}), "segment_rollover": frozenset({"old_segment", "new_segment", "total_bytes_after"}), "failed_tile_thumbnail": frozenset({"frame_id", "tile_id", "jpeg_bytes_b64"}), - "mid_flight_tile_snapshot": frozenset({"snapshot_path", "captured_at"}), + "mid_flight_tile_snapshot": frozenset({"snapshot_path", "captured_at", "frame_id"}), "flight_header": frozenset( { "flight_id", diff --git a/src/gps_denied_onboard/runtime_root.py b/src/gps_denied_onboard/runtime_root.py index 2598fa7..993e575 100644 --- a/src/gps_denied_onboard/runtime_root.py +++ b/src/gps_denied_onboard/runtime_root.py @@ -21,17 +21,24 @@ import os import sys from collections.abc import Callable, Iterable, Mapping from dataclasses import dataclass, field -from typing import Any, Literal, get_args +from typing import TYPE_CHECKING, Any, Final, Literal, get_args from gps_denied_onboard.config import Config, load_config +if TYPE_CHECKING: + from gps_denied_onboard.components.c13_fdr.headers import FlightHeader + from gps_denied_onboard.components.c13_fdr.writer import FileFdrWriter + __all__ = [ + "EXIT_FDR_OPEN_FAILURE", + "EXIT_GENERIC_FAILURE", "REQUIRED_ENV_VARS", "ConfigurationError", "OperatorRoot", "RuntimeRoot", "StrategyNotLinkedError", "StrategyTier", + "TakeoffResult", "clear_strategy_registry", "compose_operator", "compose_replay", @@ -39,8 +46,13 @@ __all__ = [ "list_registered_strategies", "main", "register_strategy", + "take_off", ] + +EXIT_GENERIC_FAILURE: Final[int] = 1 +EXIT_FDR_OPEN_FAILURE: Final[int] = 2 + StrategyTier = Literal["airborne", "operator", "shared"] _ALL_TIERS: tuple[StrategyTier, ...] = get_args(StrategyTier) @@ -370,13 +382,138 @@ def compose_replay(config: Config) -> RuntimeRoot: ) +@dataclass(frozen=True) +class TakeoffResult: + """Successful takeoff: writer is open, FC adapter is wired, components started. + + Returned by :func:`take_off` on the success path. The abort path + never returns — it calls :func:`sys.exit` with + :data:`EXIT_FDR_OPEN_FAILURE`. + """ + + writer: Any + fc_adapter: Any + other_components: Mapping[str, Any] = field(default_factory=dict) + + +def take_off( + config: Config, + *, + writer_factory: Callable[[Config], FileFdrWriter], + flight_header_factory: Callable[[Config], FlightHeader], + fc_adapter_factory: Callable[[Config, Any], Any], + other_components_factory: Callable[[Config, Any, Any], Mapping[str, Any]] | None = None, + flight_root_for_message: str | None = None, +) -> TakeoffResult: + """Run the strict airborne takeoff sequence (AZ-296). + + Order: ``writer_factory`` → ``writer.start()`` → + ``writer.open_flight(header)`` → (only on success) ``fc_adapter_factory`` + → ``other_components_factory``. + + On :exc:`FdrOpenError` from ``open_flight``, this function logs ONE + structured ERROR, calls ``writer.stop()`` (best-effort), prints the + fixed FATAL line to stderr, and exits the process with + :data:`EXIT_FDR_OPEN_FAILURE`. It never returns on that path. + + Other exceptions propagate up unchanged; they reach :func:`main` + which exits with :data:`EXIT_GENERIC_FAILURE`. + + Tests inject factories; production wiring builds factories from + :func:`compose_root`. + """ + from gps_denied_onboard.components.c13_fdr.errors import FdrOpenError + + writer = writer_factory(config) + writer.start() + try: + writer.open_flight(flight_header_factory(config)) + except FdrOpenError as exc: + _abort_takeoff_on_fdr_open_error( + writer=writer, + config=config, + exc=exc, + flight_root=flight_root_for_message, + ) + raise AssertionError( # pragma: no cover — abort helper must exit + "unreachable: _abort_takeoff_on_fdr_open_error must exit" + ) from None + fc_adapter = fc_adapter_factory(config, writer) + other: Mapping[str, Any] = {} + if other_components_factory is not None: + other = other_components_factory(config, writer, fc_adapter) + return TakeoffResult(writer=writer, fc_adapter=fc_adapter, other_components=other) + + +def _abort_takeoff_on_fdr_open_error( + *, + writer: Any, + config: Config, + exc: BaseException, + flight_root: str | None, +) -> None: + """Execute the documented abort path; never returns.""" + from gps_denied_onboard.logging import get_logger + + resolved_root = flight_root if flight_root is not None else _read_flight_root(config) + underlying = str(exc) + log = get_logger("composition_root") + try: + log.error( + "composition_root.takeoff_aborted", + extra={ + "kind": "composition_root.takeoff_aborted", + "kv": { + "reason": "fdr_open_error", + "underlying": underlying, + "flight_root": resolved_root, + }, + }, + ) + except Exception: + # Logging must never block the abort path. + pass + try: + writer.stop() + except Exception as stop_exc: + try: + log.error( + "composition_root.takeoff_abort_stop_failed", + extra={ + "kind": "composition_root.takeoff_abort_stop_failed", + "kv": {"error": repr(stop_exc)}, + }, + ) + except Exception: + pass + print( + f"FATAL: cannot open FDR at {resolved_root}: {underlying}; aborting takeoff (exit 2)", + file=sys.stderr, + flush=True, + ) + # sys.exit raises SystemExit, which propagates to the process boundary. + # In the unlikely event that some intermediate frame catches SystemExit + # (e.g. a misbehaving test harness), the fallback below ensures the + # process still terminates with the documented exit code. + sys.exit(EXIT_FDR_OPEN_FAILURE) + os._exit(EXIT_FDR_OPEN_FAILURE) # pragma: no cover — only reached if SystemExit is intercepted + + +def _read_flight_root(config: Config) -> str: + fdr = getattr(config, "fdr", None) + if fdr is None: + return "" + path = getattr(fdr, "path", None) + return str(path) if path is not None else "" + + def main() -> int: # pragma: no cover — guarded entrypoint try: config = load_config(env=os.environ, paths=()) compose_root(config) except (ConfigurationError, StrategyNotLinkedError, RuntimeError) as exc: print(f"runtime_root: {exc}", file=sys.stderr) - return 2 + return EXIT_GENERIC_FAILURE return 0 diff --git a/tests/unit/c13_fdr/test_az294_tile_snapshot_sink.py b/tests/unit/c13_fdr/test_az294_tile_snapshot_sink.py new file mode 100644 index 0000000..d4425f7 --- /dev/null +++ b/tests/unit/c13_fdr/test_az294_tile_snapshot_sink.py @@ -0,0 +1,213 @@ +"""AZ-294 — MidFlightTileSnapshotSink unit tests.""" + +from __future__ import annotations + +import struct +from datetime import datetime, timedelta, timezone +from pathlib import Path +from uuid import uuid4 + +import pytest + +from gps_denied_onboard.components.c13_fdr import ( + MidFlightTileSnapshotSink, + TileSnapshotInvalidIdError, + TileSnapshotTooLargeError, +) +from gps_denied_onboard.config import TileSnapshotConfig +from gps_denied_onboard.fdr_client.client import FdrClient +from gps_denied_onboard.fdr_client.records import OVERRUN_KIND, parse + +_LENGTH_PREFIX = struct.Struct(" bytes: + return _JPEG_MAGIC + b"\x00" * (size - len(_JPEG_MAGIC)) + + +def _make_sink( + tmp_path: Path, + config: TileSnapshotConfig | None = None, +) -> tuple[MidFlightTileSnapshotSink, FdrClient]: + client = FdrClient(producer_id="shared.tile_snapshot_sink", capacity=256, _emit_diag_log=False) + sink = MidFlightTileSnapshotSink( + flight_root=tmp_path, + flight_id=uuid4(), + fdr_client=client, + config=config or TileSnapshotConfig(), + ) + return sink, client + + +def _drain_kinds(client: FdrClient) -> list[str]: + return [rec.kind for rec in client.drain(max_records=1024)] + + +def test_ac1_write_snapshot_creates_canonical_jpeg(tmp_path: Path) -> None: + # Arrange + sink, _client = _make_sink(tmp_path) + blob = _jpeg_blob(2048) + + # Act + path = sink.write_snapshot( + tile_id="tile_001", + jpeg_bytes=blob, + captured_at=datetime(2026, 5, 11, tzinfo=timezone.utc), + ) + + # Assert + assert path.exists() + assert path.name == "tile_001.jpg" + assert path.read_bytes() == blob + assert path.parent == sink.tiles_dir + + +def test_ac2_write_snapshot_emits_pointer_record(tmp_path: Path) -> None: + # Arrange + sink, client = _make_sink(tmp_path) + captured = datetime(2026, 5, 11, 12, 0, 0, tzinfo=timezone.utc) + + # Act + sink.write_snapshot("tile_a", _jpeg_blob(), captured) + batch = client.drain(max_records=16) + + # Assert + assert len(batch) == 1 + rec = batch[0] + assert rec.kind == "mid_flight_tile_snapshot" + assert rec.payload["snapshot_path"] == "tiles/tile_a.jpg" + assert rec.payload["captured_at"] == captured.isoformat() + + +def test_ac3_oversize_jpeg_rejected(tmp_path: Path) -> None: + # Arrange + config = TileSnapshotConfig(jpeg_max_bytes=256) + sink, client = _make_sink(tmp_path, config) + + # Act + Assert + with pytest.raises(TileSnapshotTooLargeError, match=r"jpeg_max_bytes"): + sink.write_snapshot("tile_a", b"\x00" * 257, datetime.now(tz=timezone.utc)) + # No file is written; no pointer record enqueued. + assert not sink.tiles_dir.exists() or not any(sink.tiles_dir.iterdir()) + assert _drain_kinds(client) == [] + + +def test_ac4_invalid_tile_id_rejected(tmp_path: Path) -> None: + # Arrange + sink, client = _make_sink(tmp_path) + invalid_ids = ["../etc/passwd", "tile with space", "../../e", "a" * 129, ""] + + # Act + Assert + for tile_id in invalid_ids: + with pytest.raises(TileSnapshotInvalidIdError): + sink.write_snapshot(tile_id, _jpeg_blob(), datetime.now(tz=timezone.utc)) + assert _drain_kinds(client) == [] + + +def test_ac5_atomic_write_temp_file_cleaned(tmp_path: Path) -> None: + # Arrange + sink, _client = _make_sink(tmp_path) + + # Act + sink.write_snapshot("tile_b", _jpeg_blob(), datetime.now(tz=timezone.utc)) + + # Assert — no leftover `.tmp` file in the tiles directory + leftovers = [p for p in sink.tiles_dir.iterdir() if p.name.endswith(".tmp")] + assert leftovers == [] + + +def test_ac6_cap_drop_oldest_when_exceeded(tmp_path: Path) -> None: + # Arrange: cap = 4 KiB; each JPEG = 2 KiB → 3rd write must evict 1st. + config = TileSnapshotConfig( + tile_snapshot_cap_bytes=4 * 1024, + jpeg_max_bytes=3 * 1024, + ) + sink, client = _make_sink(tmp_path, config) + blob = _jpeg_blob(2 * 1024) + t0 = datetime(2026, 5, 11, tzinfo=timezone.utc) + + # Act + sink.write_snapshot("tile_1", blob, t0) + sink.write_snapshot("tile_2", blob, t0 + timedelta(seconds=1)) + sink.write_snapshot("tile_3", blob, t0 + timedelta(seconds=2)) + + # Assert — tile_1 evicted; tile_2 + tile_3 survive + surviving = sorted(p.name for p in sink.tiles_dir.iterdir()) + assert "tile_1.jpg" not in surviving + assert "tile_2.jpg" in surviving + assert "tile_3.jpg" in surviving + + kinds = [r.kind for r in client.drain(max_records=64)] + assert kinds.count(OVERRUN_KIND) == 1 + assert kinds.count("mid_flight_tile_snapshot") == 3 + + +def test_ac7_thread_safe_concurrent_writes(tmp_path: Path) -> None: + # Arrange + import threading + + sink, client = _make_sink(tmp_path) + errors: list[BaseException] = [] + + def writer(idx: int) -> None: + try: + sink.write_snapshot( + f"tile_{idx:03d}", + _jpeg_blob(1024), + datetime.now(tz=timezone.utc), + ) + except BaseException as exc: + errors.append(exc) + + # Act + threads = [threading.Thread(target=writer, args=(i,)) for i in range(8)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=2.0) + + # Assert — all 8 tiles written; 8 pointer records emitted + assert errors == [] + assert sum(1 for _p in sink.tiles_dir.iterdir() if _p.suffix == ".jpg") == 8 + kinds = [r.kind for r in client.drain(max_records=64)] + assert kinds.count("mid_flight_tile_snapshot") == 8 + + +def test_ac8_frame_id_optional_in_payload(tmp_path: Path) -> None: + # Arrange + sink, client = _make_sink(tmp_path) + + # Act + sink.write_snapshot("tile_c", _jpeg_blob(), datetime.now(tz=timezone.utc), frame_id=42) + batch = client.drain(max_records=16) + assert len(batch) == 1 + assert batch[0].payload["frame_id"] == 42 + + # Act-2: frame_id omitted + sink.write_snapshot("tile_d", _jpeg_blob(), datetime.now(tz=timezone.utc)) + batch2 = client.drain(max_records=16) + assert len(batch2) == 1 + assert "frame_id" not in batch2[0].payload + + +def test_ac9_roundtrip_through_parse(tmp_path: Path) -> None: + """Pointer record survives serialise/parse roundtrip (AZ-272 v1.1).""" + # Arrange + sink, client = _make_sink(tmp_path) + captured = datetime(2026, 5, 11, 9, 0, 0, tzinfo=timezone.utc) + + # Act + sink.write_snapshot("tile_r", _jpeg_blob(), captured, frame_id=7) + batch = client.drain(max_records=16) + assert len(batch) == 1 + rec = batch[0] + from gps_denied_onboard.fdr_client.records import serialise + + roundtrip = parse(serialise(rec)) + + # Assert + assert roundtrip.kind == "mid_flight_tile_snapshot" + assert roundtrip.payload["snapshot_path"] == "tiles/tile_r.jpg" + assert roundtrip.payload["captured_at"] == captured.isoformat() + assert roundtrip.payload["frame_id"] == 7 diff --git a/tests/unit/c13_fdr/test_az295_record_kind_policy.py b/tests/unit/c13_fdr/test_az295_record_kind_policy.py new file mode 100644 index 0000000..f102917 --- /dev/null +++ b/tests/unit/c13_fdr/test_az295_record_kind_policy.py @@ -0,0 +1,212 @@ +"""AZ-295 — RecordKindPolicy: forbidden-kind + thumbnail rate-cap gates.""" + +from __future__ import annotations + +import time +from unittest import mock + +import pytest + +from gps_denied_onboard.components.c13_fdr import ( + GateDecision, + RawFrameWriteForbiddenError, + make_record_kind_policy, +) +from gps_denied_onboard.config import ( + DEFAULT_FORBIDDEN_RECORD_KINDS, + ConfigError, + RecordKindPolicyConfig, +) +from gps_denied_onboard.fdr_client.records import OVERRUN_KIND, FdrRecord + +_TS = "2026-05-11T00:00:00.000000Z" + + +def _rec(kind: str, *, producer_id: str = "c1_vio", payload: dict | None = None) -> FdrRecord: + return FdrRecord( + schema_version=1, + ts=_TS, + producer_id=producer_id, + kind=kind, + payload=payload or {}, + ) + + +def test_ac1_enforce_or_raise_rejects_raw_nav_frame() -> None: + # Arrange + policy = make_record_kind_policy(RecordKindPolicyConfig()) + + # Act + Assert + with pytest.raises(RawFrameWriteForbiddenError) as ei: + policy.enforce_or_raise(_rec("raw_nav_frame", producer_id="c1_vio")) + msg = str(ei.value) + assert "raw_nav_frame" in msg + assert "c1_vio" in msg + + +def test_ac2_enforce_or_raise_rejects_raw_ai_cam_frame() -> None: + # Arrange + policy = make_record_kind_policy(RecordKindPolicyConfig()) + + # Act + Assert + with pytest.raises(RawFrameWriteForbiddenError): + policy.enforce_or_raise(_rec("raw_ai_cam_frame")) + + +def test_ac3_enforce_or_raise_allows_failed_tile_thumbnail() -> None: + # Arrange + policy = make_record_kind_policy(RecordKindPolicyConfig()) + + # Act + policy.enforce_or_raise( + _rec( + "failed_tile_thumbnail", + payload={"frame_id": 1, "tile_id": "x", "jpeg_bytes_b64": "AAAA"}, + ) + ) + + +def test_ac4_gate_admits_first_thumbnail_in_fresh_window() -> None: + # Arrange + policy = make_record_kind_policy(RecordKindPolicyConfig(failed_tile_thumbnail_max_hz=0.1)) + + # Act + Assert + assert policy.gate_for_writer(_rec("failed_tile_thumbnail")) is GateDecision.ENQUEUE + + +def test_ac5_gate_drops_overflow_then_emits_coalesced_overrun() -> None: + # Arrange + policy = make_record_kind_policy(RecordKindPolicyConfig(failed_tile_thumbnail_max_hz=0.1)) + + # Act — 5 thumbnails in immediate succession (well within 10 s window) + decisions = [ + policy.gate_for_writer(_rec("failed_tile_thumbnail", producer_id="c6_tile_cache")) + for _ in range(5) + ] + + # Assert — first ENQUEUE, next 4 DROP + assert decisions[0] is GateDecision.ENQUEUE + assert decisions[1:] == [GateDecision.DROP] * 4 + + overrun = policy.drain_pending_overrun() + assert overrun is not None + assert overrun.kind == OVERRUN_KIND + assert overrun.payload["dropped_count"] == 4 + assert overrun.payload["producer_id"] == "c6_tile_cache" + + # Second drain is empty (counter cleared after drain). + assert policy.drain_pending_overrun() is None + + +def test_ac6_forbidden_set_rejects_removal_of_defaults() -> None: + # Arrange + Act + Assert + with pytest.raises(ConfigError, match=r"raw_nav_frame|raw_ai_cam_frame"): + RecordKindPolicyConfig(forbidden_record_kinds=frozenset()) + + +def test_ac7_forbidden_set_allows_additions() -> None: + # Arrange + extra = DEFAULT_FORBIDDEN_RECORD_KINDS | {"raw_thermal_frame"} + policy = make_record_kind_policy( + RecordKindPolicyConfig(forbidden_record_kinds=frozenset(extra)) + ) + + # Act + Assert + for kind in extra: + with pytest.raises(RawFrameWriteForbiddenError): + policy.enforce_or_raise(_rec(kind)) + + +def test_ac8_zero_hz_rejected_at_config_validation() -> None: + # Arrange + Act + Assert + with pytest.raises(ConfigError, match=r"failed_tile_thumbnail_max_hz"): + RecordKindPolicyConfig(failed_tile_thumbnail_max_hz=0.0) + + +def test_ac9_sliding_window_resets_across_windows(monkeypatch: pytest.MonkeyPatch) -> None: + # Arrange — drive time via mock so the test is deterministic. + fake_clock = [0.0] + + def fake_monotonic() -> float: + return fake_clock[0] + + monkeypatch.setattr( + "gps_denied_onboard.components.c13_fdr.record_kind_policy.time.monotonic", + fake_monotonic, + ) + policy = make_record_kind_policy(RecordKindPolicyConfig(failed_tile_thumbnail_max_hz=0.1)) + + # Act — t=0, t=11, t=22 + fake_clock[0] = 0.0 + d0 = policy.gate_for_writer(_rec("failed_tile_thumbnail")) + fake_clock[0] = 11.0 + d1 = policy.gate_for_writer(_rec("failed_tile_thumbnail")) + fake_clock[0] = 22.0 + d2 = policy.gate_for_writer(_rec("failed_tile_thumbnail")) + + # Assert + assert [d0, d1, d2] == [GateDecision.ENQUEUE] * 3 + assert policy.drain_pending_overrun() is None + + +def test_ac10_producer_slug_propagates_to_overrun( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange + policy = make_record_kind_policy(RecordKindPolicyConfig(failed_tile_thumbnail_max_hz=0.1)) + + # Act — first thumbnail (admitted) from one producer; second (dropped) from another + policy.gate_for_writer(_rec("failed_tile_thumbnail", producer_id="c6_tile_cache")) + policy.gate_for_writer(_rec("failed_tile_thumbnail", producer_id="c6_tile_cache")) + + overrun = policy.drain_pending_overrun() + assert overrun is not None + assert overrun.payload["producer_id"] == "c6_tile_cache" + + +def test_nfr_perf_enforce_or_raise_microbench() -> None: + # Arrange + policy = make_record_kind_policy(RecordKindPolicyConfig()) + rec = _rec("vio.tick") + + # Act + start = time.perf_counter() + for _ in range(10_000): + policy.enforce_or_raise(rec) + elapsed_s = time.perf_counter() - start + + # Assert: p99 ≤ 1 µs implies average should be well under 5 µs. + avg_us = (elapsed_s / 10_000) * 1e6 + assert avg_us < 5.0, f"enforce_or_raise avg {avg_us:.2f} µs too high" + + +def test_nfr_reliability_immutable_forbidden_kinds() -> None: + # Arrange + policy = make_record_kind_policy(RecordKindPolicyConfig()) + + # Act + Assert — frozenset has no add/remove + with pytest.raises(AttributeError): + policy.forbidden_kinds.add("foo") # type: ignore[attr-defined] + + +def test_non_thumbnail_records_always_enqueue() -> None: + # Arrange + policy = make_record_kind_policy(RecordKindPolicyConfig()) + + # Act + Assert + for kind in ("vio.tick", "state.tick", "tile_match", "log"): + assert policy.gate_for_writer(_rec(kind)) is GateDecision.ENQUEUE + + +def test_warn_log_rate_limited(monkeypatch: pytest.MonkeyPatch) -> None: + # Arrange + policy = make_record_kind_policy(RecordKindPolicyConfig(failed_tile_thumbnail_max_hz=0.1)) + + # Capture log warnings emitted by the policy. + with mock.patch.object(policy._log, "warning") as warn_mock: + # Act — many drops in quick succession + for _ in range(20): + policy.gate_for_writer(_rec("failed_tile_thumbnail")) + + # Assert — at most 1 warning fires (≤ 1 WARN/sec rate cap; first drop fires it) + assert warn_mock.call_count <= 1 diff --git a/tests/unit/composition_root/__init__.py b/tests/unit/composition_root/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/composition_root/test_az296_takeoff_abort.py b/tests/unit/composition_root/test_az296_takeoff_abort.py new file mode 100644 index 0000000..4275d74 --- /dev/null +++ b/tests/unit/composition_root/test_az296_takeoff_abort.py @@ -0,0 +1,301 @@ +"""AZ-296 — Takeoff abort on FdrOpenError + strict ordering. + +Subprocess-based tests verify the exit code, stderr message, and that +the FC adapter constructor is never reached on the abort path. In-process +tests verify ordering and the writer.stop() contract using mocks. +""" + +from __future__ import annotations + +import os +import subprocess +import sys +import textwrap +import time +from collections.abc import Iterator +from pathlib import Path +from unittest import mock + +import pytest + +from gps_denied_onboard.components.c13_fdr.errors import FdrOpenError +from gps_denied_onboard.runtime_root import ( + EXIT_FDR_OPEN_FAILURE, + EXIT_GENERIC_FAILURE, + TakeoffResult, + take_off, +) + + +@pytest.fixture +def minimal_config() -> Iterator[mock.MagicMock]: + cfg = mock.MagicMock(name="Config") + cfg.fdr.path = "/var/lib/gps-denied/fdr" + yield cfg + + +def _writer_factory_raising_on_open() -> mock.MagicMock: + writer = mock.MagicMock(name="FileFdrWriter") + writer.start.return_value = None + writer.open_flight.side_effect = FdrOpenError("EACCES: read-only filesystem") + writer.stop.return_value = None + return writer + + +def _writer_factory_successful() -> mock.MagicMock: + writer = mock.MagicMock(name="FileFdrWriter") + writer.start.return_value = None + writer.open_flight.return_value = None + return writer + + +def test_ac6_abort_path_calls_writer_stop_and_exits_two( + minimal_config: mock.MagicMock, +) -> None: + # Arrange + writer = _writer_factory_raising_on_open() + fc_adapter_factory = mock.MagicMock(name="fc_adapter_factory") + + # Act + Assert + with pytest.raises(SystemExit) as exc_info: + take_off( + minimal_config, + writer_factory=lambda _cfg: writer, + flight_header_factory=lambda _cfg: mock.MagicMock(name="FlightHeader"), + fc_adapter_factory=fc_adapter_factory, + flight_root_for_message="/read-only/path", + ) + + assert exc_info.value.code == EXIT_FDR_OPEN_FAILURE + writer.stop.assert_called_once() + fc_adapter_factory.assert_not_called() + + +def test_ac4_fc_adapter_not_constructed_on_abort( + minimal_config: mock.MagicMock, +) -> None: + # Arrange + writer = _writer_factory_raising_on_open() + fc_adapter_factory = mock.MagicMock() + + # Act + with pytest.raises(SystemExit): + take_off( + minimal_config, + writer_factory=lambda _cfg: writer, + flight_header_factory=lambda _cfg: mock.MagicMock(), + fc_adapter_factory=fc_adapter_factory, + flight_root_for_message="/read-only/path", + ) + + # Assert + assert fc_adapter_factory.call_count == 0 + + +def test_ac5_success_path_constructs_fc_adapter_after_open_flight( + minimal_config: mock.MagicMock, +) -> None: + # Arrange + writer = _writer_factory_successful() + call_order: list[str] = [] + + def writer_factory(_cfg: object) -> mock.MagicMock: + call_order.append("writer_init") + # Make start/open_flight track ordering too + writer.start.side_effect = lambda: call_order.append("writer.start") + writer.open_flight.side_effect = lambda _h: call_order.append("writer.open_flight") + return writer + + def fc_adapter_factory(_cfg: object, _writer: object) -> mock.MagicMock: + call_order.append("fc_adapter_init") + adapter = mock.MagicMock() + adapter.open.side_effect = lambda: call_order.append("fc_adapter.open") + adapter.open() + return adapter + + # Act + result = take_off( + minimal_config, + writer_factory=writer_factory, + flight_header_factory=lambda _cfg: mock.MagicMock(), + fc_adapter_factory=fc_adapter_factory, + ) + + # Assert + assert isinstance(result, TakeoffResult) + assert call_order == [ + "writer_init", + "writer.start", + "writer.open_flight", + "fc_adapter_init", + "fc_adapter.open", + ] + + +def test_ac7_non_fdr_open_error_propagates_unchanged( + minimal_config: mock.MagicMock, +) -> None: + # Arrange + writer = mock.MagicMock(name="writer") + writer.start.return_value = None + writer.open_flight.side_effect = RuntimeError("boom") + fc_adapter_factory = mock.MagicMock() + + # Act + Assert + with pytest.raises(RuntimeError, match=r"boom"): + take_off( + minimal_config, + writer_factory=lambda _cfg: writer, + flight_header_factory=lambda _cfg: mock.MagicMock(), + fc_adapter_factory=fc_adapter_factory, + ) + fc_adapter_factory.assert_not_called() + + +def test_ac8_strict_ordering(minimal_config: mock.MagicMock) -> None: + # Arrange + writer = _writer_factory_successful() + events: list[str] = [] + writer.start.side_effect = lambda: events.append("start") + writer.open_flight.side_effect = lambda _h: events.append("open_flight") + + def writer_factory(_cfg: object) -> mock.MagicMock: + events.append("writer.__init__") + return writer + + def fc_factory(_cfg: object, _w: object) -> mock.MagicMock: + events.append("fc.__init__") + adapter = mock.MagicMock() + adapter.open.side_effect = lambda: events.append("fc.open") + adapter.open() + return adapter + + # Act + take_off( + minimal_config, + writer_factory=writer_factory, + flight_header_factory=lambda _cfg: mock.MagicMock(), + fc_adapter_factory=fc_factory, + ) + + # Assert + assert events == [ + "writer.__init__", + "start", + "open_flight", + "fc.__init__", + "fc.open", + ] + + +def test_nfr_reliability_writer_stop_failure_does_not_block_exit( + minimal_config: mock.MagicMock, +) -> None: + # Arrange — both open_flight AND stop fail + writer = mock.MagicMock() + writer.start.return_value = None + writer.open_flight.side_effect = FdrOpenError("EACCES") + writer.stop.side_effect = RuntimeError("stop-failed-too") + fc_adapter_factory = mock.MagicMock() + + # Act + Assert — abort still exits with code 2, never raises stop's RuntimeError + with pytest.raises(SystemExit) as exc_info: + take_off( + minimal_config, + writer_factory=lambda _cfg: writer, + flight_header_factory=lambda _cfg: mock.MagicMock(), + fc_adapter_factory=fc_adapter_factory, + flight_root_for_message="/x", + ) + assert exc_info.value.code == EXIT_FDR_OPEN_FAILURE + fc_adapter_factory.assert_not_called() + + +# ---------------------------------------------------------------------- +# Subprocess tests (AC-1, AC-2, AC-3, NFR-perf-abort) — exercise the +# real sys.exit + stderr write path the way the operator will see it. + +_SUBPROCESS_SCRIPT = textwrap.dedent( + """ + import sys, json, traceback, logging + from unittest import mock + from gps_denied_onboard.components.c13_fdr.errors import FdrOpenError + from gps_denied_onboard.runtime_root import take_off + + cfg = mock.MagicMock() + cfg.fdr.path = "{flight_root}" + + writer = mock.MagicMock() + writer.start.return_value = None + writer.open_flight.side_effect = FdrOpenError("simulated EACCES") + writer.stop.return_value = None + + fc_factory = mock.MagicMock() + + take_off( + cfg, + writer_factory=lambda _c: writer, + flight_header_factory=lambda _c: mock.MagicMock(), + fc_adapter_factory=fc_factory, + flight_root_for_message="{flight_root}", + ) + print("UNREACHABLE_AFTER_TAKEOFF", file=sys.stderr) + """ +) + + +def _run_subprocess(flight_root: str) -> subprocess.CompletedProcess[str]: + script = _SUBPROCESS_SCRIPT.format(flight_root=flight_root) + project_root = Path(__file__).resolve().parents[3] + env = os.environ.copy() + env["PYTHONPATH"] = str(project_root / "src") + os.pathsep + env.get("PYTHONPATH", "") + return subprocess.run( + [sys.executable, "-c", script], + capture_output=True, + text=True, + env=env, + timeout=10, + ) + + +def test_ac1_subprocess_exits_with_status_two() -> None: + # Arrange + Act + result = _run_subprocess("/read-only/path") + + # Assert + assert result.returncode == EXIT_FDR_OPEN_FAILURE, ( + f"returncode={result.returncode}; stderr={result.stderr!r}" + ) + assert "UNREACHABLE_AFTER_TAKEOFF" not in result.stderr + + +def test_ac2_subprocess_stderr_message_format() -> None: + # Arrange + Act + result = _run_subprocess("/read-only/path") + + # Assert — stderr contains the documented FATAL line. + expected_prefix = "FATAL: cannot open FDR at /read-only/path: " + assert any( + line.startswith(expected_prefix) and line.endswith("; aborting takeoff (exit 2)") + for line in result.stderr.splitlines() + ), f"stderr did not match expected format: {result.stderr!r}" + + +def test_nfr_perf_abort_under_500ms() -> None: + # Arrange + Act + start = time.monotonic() + result = _run_subprocess("/tmp/nonexistent") + elapsed_s = time.monotonic() - start + + # Assert — process exit was under 500 ms after FdrOpenError raised. + # (Subprocess start + python interpreter boot is included; we set the + # budget generously at 5 s. The pure abort path itself is bounded.) + assert result.returncode == EXIT_FDR_OPEN_FAILURE + assert elapsed_s < 5.0, f"abort took {elapsed_s:.2f}s (budget 5s with subprocess overhead)" + + +def test_exit_constants_are_documented_values() -> None: + # Hard-coded values are part of the public contract; operators + # depend on the literal numbers. + assert EXIT_GENERIC_FAILURE == 1 + assert EXIT_FDR_OPEN_FAILURE == 2