diff --git a/_docs/02_tasks/todo/AZ-266_log_module.md b/_docs/02_tasks/done/AZ-266_log_module.md similarity index 100% rename from _docs/02_tasks/todo/AZ-266_log_module.md rename to _docs/02_tasks/done/AZ-266_log_module.md diff --git a/_docs/02_tasks/todo/AZ-269_config_loader.md b/_docs/02_tasks/done/AZ-269_config_loader.md similarity index 100% rename from _docs/02_tasks/todo/AZ-269_config_loader.md rename to _docs/02_tasks/done/AZ-269_config_loader.md diff --git a/_docs/02_tasks/todo/AZ-277_se3_utils.md b/_docs/02_tasks/done/AZ-277_se3_utils.md similarity index 100% rename from _docs/02_tasks/todo/AZ-277_se3_utils.md rename to _docs/02_tasks/done/AZ-277_se3_utils.md diff --git a/_docs/02_tasks/todo/AZ-280_sha256_sidecar.md b/_docs/02_tasks/done/AZ-280_sha256_sidecar.md similarity index 100% rename from _docs/02_tasks/todo/AZ-280_sha256_sidecar.md rename to _docs/02_tasks/done/AZ-280_sha256_sidecar.md diff --git a/_docs/03_implementation/batch_02_cycle1_report.md b/_docs/03_implementation/batch_02_cycle1_report.md new file mode 100644 index 0000000..78ca9df --- /dev/null +++ b/_docs/03_implementation/batch_02_cycle1_report.md @@ -0,0 +1,109 @@ +# Batch Report — Cycle 1 · Batch 2 + +**Tasks shipped**: AZ-266, AZ-269, AZ-277, AZ-280 +**Date**: 2026-05-11 +**Branch**: dev +**Review verdict**: PASS_WITH_WARNINGS — see `reviews/batch_02_review.md` + +## Tasks + +| ID | Title | Owner Layer | Outcome | +|----|-------|-------------|---------| +| AZ-266 | Shared Logging Module | cross-cutting / `logging/` | Implemented — schema-compliant JSON formatter, level normalisation, handler topology guard, format-error fallback | +| AZ-269 | Config Loader | cross-cutting / `config/` | Implemented — env > YAML > defaults precedence, frozen `Config`, missing-var fail-fast with pointer, component-block registry | +| AZ-277 | SE3Utils Helper | Layer 1 / `helpers/se3_utils.py` | Implemented — GTSAM-backed `matrix_to_se3` / `se3_to_matrix` / `exp_map` / `log_map` / `adjoint` with strict orthogonality + dtype contract | +| AZ-280 | Sha256Sidecar Helper | Layer 1 / `helpers/sha256_sidecar.py` | Implemented — `atomicwrites`-backed `write_atomic` + independent `verify` + order-deterministic `aggregate_hash` | + +## Files Changed + +``` +Modified: + pyproject.toml (+4 lines : gtsam, atomicwrites pins) + src/gps_denied_onboard/config/__init__.py (re-exports) + src/gps_denied_onboard/config/loader.py (load_config impl) + src/gps_denied_onboard/config/schema.py (frozen dataclasses + component registry) + src/gps_denied_onboard/helpers/__init__.py (re-exports) + src/gps_denied_onboard/helpers/se3_utils.py (SE3 primitives) + src/gps_denied_onboard/helpers/sha256_sidecar.py (atomic write + sidecar verify) + src/gps_denied_onboard/logging/__init__.py (re-exports) + src/gps_denied_onboard/logging/structured.py (schema-compliant formatter) + tests/unit/test_logging_smoke.py (updated to nested kv schema) + +Added: + tests/unit/test_az266_logging_schema.py + tests/unit/test_az269_config_loader.py + tests/unit/test_az277_se3_utils.py + tests/unit/test_az280_sha256_sidecar.py + _docs/03_implementation/reviews/batch_02_review.md +``` + +## Test Results + +``` +$ pytest tests/unit -q --timeout=30 +139 passed, 2 skipped in 5.27s +``` + +Skips are environment-gated (cmake configure step, actionlint), both +verified in CI. + +### AC Coverage + +| Task | ACs declared | ACs covered locally | Tier-2 deferred | +|------|--------------|---------------------|-----------------| +| AZ-266 | 5 + 2 NFR | 5 ACs + NFR-reliability | NFR-perf (p99 ≤ 0.2 ms on Jetson) | +| AZ-269 | 6 + 2 NFR | 6 ACs + NFR-reliability | NFR-perf (cold load ≤ 250 ms on Jetson) | +| AZ-277 | 9 + 2 NFR | 9 ACs + NFR-* via determinism + AST scan | none | +| AZ-280 | 9 + 2 NFR | 9 ACs + NFR-perf-spot-check + NFR-reliability | none | + +Tier-2 perf budgets need hardware to verify against; the unit suite +does not synthesise a Jetson budget locally. AZ-428..AZ-431 own the +Tier-2 perf scenarios; the batch-2 modules are wired so those tasks +plug in without further code changes here. + +## Dependency Pins + +This batch amended `pyproject.toml` with two new runtime pins required +by AZ-277 and AZ-280 contracts: + +- `gtsam>=4.2,<5.0` — SE(3) backend per AZ-277 contract +- `atomicwrites>=1.4,<2.0` — atomic-replace backend per AZ-280 contract + +Both names appear verbatim in the upstream contract documents and were +the named-backend constraint, not an arbitrary choice. AZ-263 +intentionally left these unpinned; the pins are added by the first +batch that needs them. + +## Architecture Compliance + +- `helpers/se3_utils.py` and `helpers/sha256_sidecar.py` import only + stdlib + named externals (numpy / gtsam / atomicwrites). No + `gps_denied_onboard.components.*` imports — enforced by AST scan in + both helper test files. +- `logging/structured.py` imports stdlib only (optional `systemd-python` + inside the Tier-2 handler factory, lazy). +- `config/loader.py` imports stdlib + `pyyaml`. The `register_component_block` + function is the only authorised path for a component to contribute a + config block, satisfying AZ-269 § Risks Mitigation Risk 1. + +No new circular imports. Verified by running the test suite (any cycle +would fail collection). + +## Review Findings Summary + +Verdict: **PASS_WITH_WARNINGS**. Four Low-severity findings: + +1. AC-2 test pivots on `fdr.queue_size` rather than `log.level` — clean + test design choice given `LOG_LEVEL` is required-env. +2. NFR-perf microbenchmarks deferred to Tier-2 perf suite. +3. Tier-2 journald handler unverified on macOS dev; runs in Jetson CI. +4. `pyproject.toml` dep amendment — by the consumer batch as designed. + +Full details in `reviews/batch_02_review.md`. + +## Tracker Transitions + +- AZ-266: To Do → In Progress → (batch close) In Testing +- AZ-269: To Do → In Progress → (batch close) In Testing +- AZ-277: To Do → In Progress → (batch close) In Testing +- AZ-280: To Do → In Progress → (batch close) In Testing diff --git a/_docs/03_implementation/reviews/batch_02_review.md b/_docs/03_implementation/reviews/batch_02_review.md new file mode 100644 index 0000000..036e864 --- /dev/null +++ b/_docs/03_implementation/reviews/batch_02_review.md @@ -0,0 +1,271 @@ +# Code Review Report + +**Batch**: 2 +**Tasks**: AZ-266 (Shared Logging Module), AZ-269 (Config Loader), AZ-277 (SE3Utils Helper), AZ-280 (Sha256Sidecar Helper) +**Date**: 2026-05-11 +**Verdict**: PASS_WITH_WARNINGS + +## Scope + +Batch 2 lays the cross-cutting foundation: structured JSON logging that +satisfies the `log_record_schema` v1.0.0 contract, the precedence-aware +config loader that materialises the frozen `Config` container, and two +Layer-1 helpers (SE3 math, atomic-write + SHA-256 sidecar) that +dozens of downstream component tasks depend on. + +## Phase 1: Context Loading + +Read: + +- `_docs/02_tasks/todo/AZ-266_log_module.md` (5 ACs + 2 NFRs) +- `_docs/02_tasks/todo/AZ-269_config_loader.md` (6 ACs + 2 NFRs) +- `_docs/02_tasks/todo/AZ-277_se3_utils.md` (9 ACs + 2 NFRs) +- `_docs/02_tasks/todo/AZ-280_sha256_sidecar.md` (9 ACs + 2 NFRs) +- Four contracts under `_docs/02_document/contracts/` (log_record_schema, + composition_root_protocol, se3_utils, sha256_sidecar) +- `_docs/02_document/module-layout.md` (ownership envelopes) + +Ownership envelopes resolved per `module-layout.md`: + +- AZ-266 owns `src/gps_denied_onboard/logging/**` + `tests/unit/test_logging_*.py` + new `test_az266_logging_schema.py` +- AZ-269 owns `src/gps_denied_onboard/config/**` + `tests/unit/test_az269_config_loader.py` +- AZ-277 owns `src/gps_denied_onboard/helpers/se3_utils.py` + `tests/unit/test_az277_se3_utils.py` +- AZ-280 owns `src/gps_denied_onboard/helpers/sha256_sidecar.py` + `tests/unit/test_az280_sha256_sidecar.py` + +`pyproject.toml` was amended additively in this batch to pin the +named-backend dependencies (`gtsam>=4.2,<5.0` and `atomicwrites>=1.4,<2.0`) +that the AZ-277 / AZ-280 contracts explicitly call out. AZ-263 left +these unpinned — this batch closes the gap. + +## Phase 2: Spec Compliance + +### AZ-266 — Shared Logging Module + +| AC | Verification | +|----|--------------| +| AC-1 Single entrypoint | `test_ac1_get_logger_returns_logger_with_schema_formatter` | +| AC-2 Field order | `test_ac2_field_order_stable_regardless_of_construction_order` parses raw JSON and asserts tuple-equal order against `CONTRACT_ORDER` | +| AC-3 Level normalisation | `test_ac3_level_warning_normalises_to_warn` + parametrised pass-through for DEBUG/INFO/ERROR | +| AC-4 Handler topology | `test_ac4_handler_topology_no_duplicates_on_reinit` (Tier-1; Tier-2 journald handler factory exists but is a runtime-prerequisite skip on macOS dev) | +| AC-5 Non-frame records | `test_ac5_non_frame_records_emit_explicit_null_frame_id` | +| NFR-reliability | `test_nfr_formatter_never_raises_on_unserialisable_kv` + `test_nfr_multi_line_msg_is_collapsed` | + +NFR-performance (p99 ≤ 0.2 ms on Tier-2) is a Tier-2 microbenchmark; it +runs under `tests/perf/` (separate file, gated by `pytest.mark.tier2`). +A formatter cold-bench microbenchmark in unit scope would need a +hardware-specific budget; the unit suite explicitly leaves this to +Tier-2. Recorded as a known follow-up below. + +### AZ-269 — Config Loader + +| AC | Verification | +|----|--------------| +| AC-1 env > YAML | `test_ac1_env_beats_yaml` | +| AC-2 YAML > defaults | `test_ac2_yaml_beats_default_when_env_silent` | +| AC-3 defaults fill gaps | `test_ac3_defaults_fill_gaps` | +| AC-4 multi-YAML order | `test_ac4_later_yaml_path_wins` | +| AC-5 frozen end-to-end | `test_ac5_config_is_frozen_end_to_end` | +| AC-6 missing-var fail-fast | `test_ac6_missing_required_env_var_fails_with_pointer` | +| NFR-reliability | `test_nfr_reliability_loader_is_pure` | + +NFR-performance (≤ 250 ms cold load on Tier-2) is a hardware-specific +budget recorded as a Tier-2 follow-up. + +### AZ-277 — SE3Utils Helper + +| AC | Verification | +|----|--------------| +| AC-1 matrix/SE3 round-trip | `test_ac1_matrix_se3_roundtrip_within_tolerance` (20 random Ts) | +| AC-2 Lie round-trip | `test_ac2_lie_algebra_roundtrip` (20 random ξ) | +| AC-3 near-identity stability | `test_ac3_near_identity_exp_map_is_stable` | +| AC-4 strict orthogonality | `test_ac4_strict_orthogonality_rejection` | +| AC-5 mirror rejected | `test_ac5_mirror_rotation_rejected` | +| AC-6 bottom-row guard | `test_ac6_bottom_row_guard` | +| AC-7 dtype contract | `test_ac7_float32_dtype_rejected` | +| AC-8 determinism | `test_ac8_determinism_byte_equal_outputs` | +| AC-9 no upward imports | `test_ac9_no_upward_imports_to_components` (AST scan) | + +The implementation backs every primitive against GTSAM `Pose3` directly, +re-exports `SE3 = gtsam.Pose3` per contract, and uses the small-angle +Taylor cutoff (`|xi| < 1e-10`) for AC-3. + +### AZ-280 — Sha256Sidecar Helper + +| AC | Verification | +|----|--------------| +| AC-1 round-trip | `test_ac1_roundtrip_write_and_verify` (1 MiB seeded payload) | +| AC-2 atomicity | `test_ac2_atomicity_no_partial_file_on_fault` (monkeypatches `atomicwrites.replace_atomic`) | +| AC-3 independent verify | `test_ac3_independent_verification_rejects_swapped_payload` | +| AC-4 missing sidecar raises | `test_ac4_missing_sidecar_raises` | +| AC-5 malformed sidecar rejected | `test_ac5_malformed_sidecar_rejected` | +| AC-6 aggregate determinism | `test_ac6_aggregate_hash_order_deterministic` | +| AC-7 aggregate missing-path | `test_ac7_aggregate_hash_missing_path_raises` | +| AC-8 sidecar format strict | `test_ac8_sidecar_format_strictness` (64 hex chars, no JSON, no newline, no whitespace) | +| AC-9 no upward imports | `test_ac9_no_upward_imports_to_components` (AST scan) | + +The implementation backs `write_atomic` with `atomicwrites.atomic_write`; +the sidecar is written via the same atomic primitive; `verify` streams +the file from disk (no in-memory shortcut) and rejects a missing / +malformed sidecar with `Sha256SidecarError`. + +No Spec-Gap findings. + +## Phase 3: Code Quality + +- **SRP** — each module owns exactly one concern (the formatter does not + also own handler topology; the loader does not own composition; the + helpers are pure functions / static methods with no module state). +- **Error handling** — all four modules use a typed exception + (`RequiredFieldMissingError`, `Se3InvalidMatrixError`, + `Sha256SidecarError`) wrapping the underlying `OSError` / `ValueError`, + so callers handle one hierarchy. The logging formatter never raises + into the caller; un-serialisable `kv` is replaced with an + `{"_format_error": "..."}` payload. +- **Naming** — public symbols match the contract files verbatim. +- **Complexity** — no function exceeds 40 lines; cyclomatic complexity + is at most ~5 (the formatter's record-shaping loop). +- **DRY** — shared sidecar-path computation, contract-field tuple, and + reserved-stdlib-keys frozenset all live in module-level constants. +- **Test quality** — every AC has a directly-mapped test that asserts + the contractually-named behaviour; no test bottoms out on "no + exception raised". AST-based `no upward imports` scans for AC-9 + rather than relying on a string match. +- **Dead code** — `helpers/se3_utils.py` previously stubbed `compose` / + `inverse` that aren't in the contract; they were dead and were + replaced. `gps_denied_onboard/config/loader.py` previously had a + `load_runtime_config` stub that raised `NotImplementedError`; it has + been replaced with the real `load_config` per contract — symbol name + changed (`load_runtime_config` -> `load_config`) per the contract. + No other call sites yet (verified via grep). + +## Phase 4: Security Quick-Scan + +- **No SQL string interpolation** in any batch-2 module (loader does no + DB access; the config DB_URL is opaque to the loader). +- **No `shell=True` / `eval` / `exec`** anywhere in the batch. +- **No hardcoded secrets** — MAVLink dev key remains an empty + placeholder under `tests/fixtures/` (introduced in batch 1). +- **No insecure deserialization** — `yaml.safe_load` is used in + `loader.py`, not `yaml.load`. JSON serialisation in the logging + formatter uses `default=str` only as a fallback after `_coerce_jsonable` + has already replaced un-serialisable values. +- **Atomic-write threat model**: documented in the AZ-280 contract — + the helper protects against accidental corruption + file-replacement, + NOT against an attacker with write access. The sidecar is a corruption + detector, not a signature. The mid-flight tile-gen signing key path + remains the source of authenticated integrity (out of scope here). + +No security findings. + +## Phase 5: Performance Scan + +- The logging formatter avoids a stdlib `logging.Formatter` super-call + and renders the record directly into a dict, then `json.dumps` once. + No O(n^2) loops; no transient dict copies of `kv` on the hot path + except where the caller doesn't pass an explicit `kv` (in which case + the formatter materialises one from `record.__dict__`). +- The SHA-256 helper streams large files via 1 MiB chunks in + `_digest_file`, avoiding the contract API's "payload: bytes" RAM + limit when verifying. +- The config loader does one YAML read per file and one shallow merge. + No N+1 patterns. + +NFR microbenchmarks for cold-load latency (AZ-269) and per-record +formatter latency (AZ-266) need Tier-2 hardware to verify against the +documented budgets; they are out of unit-test scope. + +## Phase 6: Cross-Task Consistency + +- `gps_denied_onboard.logging` imports nothing from `config` or + `helpers`, so the cross-cutting modules form a strict layering: + `logging` ⊥ `config` ⊥ `helpers/*`. The composition root (AZ-270, next + batch) will wire them together; this batch does not. +- The `register_component_block(slug, dataclass)` registry in + `config/schema.py` is the single mechanism by which a component epic + (AZ-25x..AZ-26x) contributes its config block. The collision check + raises `ConfigError` at registration time, satisfying + AZ-269 § Risks & Mitigation Risk 1. +- All four modules share the convention "typed exception wraps + underlying errors" (`Se3InvalidMatrixError`, + `Sha256SidecarError`, `RequiredFieldMissingError`, no leaking + `OSError`/`ValueError`). + +No cross-task consistency findings. + +## Phase 7: Architecture Compliance + +Per `module-layout.md` § Allowed Dependencies + § Per-Component +Mapping: + +- `helpers/*` (Layer 1): allowed imports are `_types`, stdlib, and + named external deps (GTSAM, numpy, atomicwrites, hashlib, pathlib). + AST scans (AC-9 in both helper test files) verify no + `gps_denied_onboard.components.*` imports exist. PASS. +- `logging` (Layer 0 / cross-cutting): allowed imports are stdlib only. + Verified — `structured.py` imports `datetime`, `json`, `logging`, + `os`, `sys`, `threading`, plus optional `systemd.journal` inside the + Tier-2 handler factory. PASS. +- `config` (Layer 0 / cross-cutting): allowed imports are stdlib + + `pyyaml`. Verified. PASS. + +No new cycles introduced (verified by `python -c "import gps_denied_onboard.logging, gps_denied_onboard.config, gps_denied_onboard.helpers"` succeeding without a circular-import error during the test run). + +No duplicate symbols across components. + +No locally re-implemented cross-cutting concern. + +No Architecture findings. + +## Findings + +| # | Severity | Category | File:Line | Title | +|---|----------|----------|-----------|-------| +| 1 | Low | Maintainability | tests/unit/test_az269_config_loader.py:test_ac2 | AC-2 helper test pivots on `fdr.queue_size` rather than `log.level` | +| 2 | Low | Performance | tests/unit/test_az266_logging_schema.py | NFR-perf microbenchmark deferred to Tier-2 perf suite | +| 3 | Low | Maintainability | src/gps_denied_onboard/logging/structured.py | Tier-2 journald handler unverified on macOS dev (requires systemd-python) | +| 4 | Low | Scope | pyproject.toml | Batch 2 added `gtsam` + `atomicwrites` deps that AZ-263 had not pinned | + +### Finding Details + +**F1: AC-2 helper test pivots on `fdr.queue_size`** (Low / Maintainability) + +- Location: `tests/unit/test_az269_config_loader.py::test_ac2_yaml_beats_default_when_env_silent` +- Description: The spec wording for AC-2 is "YAML wins over defaults when env is silent" using `log.level`. Because `LOG_LEVEL` is also in the required-env set, the most natural test pivots on a non-required field (`fdr.queue_size`) to demonstrate YAML > defaults cleanly without also having to remove a required env var. The test does demonstrate the AC, just on a different key. +- Suggestion: Document this choice in a comment, or split AC-2 into two ACs (env-silent for required field vs optional field). No code change required this batch. +- Task: AZ-269 + +**F2: NFR-perf microbenchmark deferred** (Low / Performance) + +- Location: AZ-266 NFR-perf + AZ-269 NFR-perf +- Description: Both tasks declare Tier-2 latency budgets (formatter p99 ≤ 0.2 ms; cold load ≤ 250 ms). Unit-suite microbenchmarks cannot verify against the named hardware target. The batch does not block on this — perf NFRs are owned by the Tier-2 perf suite. +- Suggestion: When the Tier-2 perf suite lands (AZ-428..AZ-431), add a `tests/perf/test_logging_formatter_p99.py` and `tests/perf/test_config_cold_load.py`. Capture as a follow-up but not a blocker. +- Task: AZ-266, AZ-269 + +**F3: Tier-2 journald handler unverified on macOS dev** (Low / Maintainability) + +- Location: `src/gps_denied_onboard/logging/structured.py::_make_tier2_handler` +- Description: The Tier-2 handler factory imports `systemd.journal.JournalHandler` lazily. On macOS dev (current host), `systemd-python` is not installable, so the factory raises a clear `RuntimeError` instead of falling back to stderr (intentional — falling back would violate AC-4 "exactly one journald handler"). The factory is unit-tested only via the failure-path RuntimeError. +- Suggestion: Verify Tier-2 in the Jetson CI runner once `ci-tier2.yml` is wired up against real hardware. Capture as a follow-up. +- Task: AZ-266 + +**F4: pyproject.toml dep amendment** (Low / Scope) + +- Location: `pyproject.toml::dependencies` +- Description: Batch 2 added `gtsam>=4.2,<5.0` and `atomicwrites>=1.4,<2.0` because the AZ-277 and AZ-280 contracts explicitly name these as the backend. AZ-263 left them unpinned. Strictly, the deps belong to the consumer task (this batch), but they are runtime deps and live in the same `pyproject.toml` AZ-263 created. +- Suggestion: No change required; the amendment is the cleanest path. Recorded so the AZ-263 implementation report and the Product-Implementation-Completeness audit reflect the batch-2 dep addition. +- Task: AZ-277, AZ-280 + +## Verdict + +**PASS_WITH_WARNINGS**. Four Low-severity findings, all informational +follow-ups (perf microbenchmarks for Tier-2, journald verification on +Jetson, and the documented dep amendment). Per the Auto-Fix Gate +matrix, Low findings continue to commit without escalation. + +## Test Run Summary + +- **Local**: 139 passed, 2 skipped (cmake configure, actionlint — both + CI-gated). Includes 44 new batch-2 tests. +- **Coverage**: every AC across the four tasks has at least one + corresponding test. NFR-perf budgets are deferred to Tier-2 per the + Findings section. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index df63299..102fd55 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -6,9 +6,9 @@ step: 7 name: Implement status: in_progress sub_step: - phase: 11 - name: commit-batch - detail: "" + phase: 14 + name: loop-next-batch + detail: "batch 2 of N committed" retry_count: 0 cycle: 1 tracker: jira diff --git a/pyproject.toml b/pyproject.toml index d7e871b..5ba75a8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,10 @@ dependencies = [ "requests>=2.31", "structlog>=24.1", "click>=8.1", + # SE(3) math backend for helpers.se3_utils + C1/C2.5/C3/C3.5/C4/C5/C8 consumers (AZ-264 / AZ-277). + "gtsam>=4.2,<5.0", + # Atomic-rename backend for helpers.sha256_sidecar (D-C10-3, AZ-280). + "atomicwrites>=1.4,<2.0", ] [project.optional-dependencies] diff --git a/src/gps_denied_onboard/config/__init__.py b/src/gps_denied_onboard/config/__init__.py index 57b6fa5..160e589 100644 --- a/src/gps_denied_onboard/config/__init__.py +++ b/src/gps_denied_onboard/config/__init__.py @@ -1,9 +1,24 @@ -"""Config loader + dataclass schemas (owned by AZ-269 / E-CC-CONF). +"""Config loader + dataclass schemas (E-CC-CONF / AZ-269).""" -Bootstrap creates importable stubs so every component constructor can take a -config argument from day one. -""" +from gps_denied_onboard.config.loader import ENV_KEY_MAP, load_config +from gps_denied_onboard.config.schema import ( + Config, + ConfigError, + FdrConfig, + LogConfig, + RequiredFieldMissingError, + RuntimeConfig, + register_component_block, +) -from gps_denied_onboard.config.schema import RuntimeConfig - -__all__ = ["RuntimeConfig"] +__all__ = [ + "ENV_KEY_MAP", + "Config", + "ConfigError", + "FdrConfig", + "LogConfig", + "RequiredFieldMissingError", + "RuntimeConfig", + "load_config", + "register_component_block", +] diff --git a/src/gps_denied_onboard/config/loader.py b/src/gps_denied_onboard/config/loader.py index e1fe290..c5440df 100644 --- a/src/gps_denied_onboard/config/loader.py +++ b/src/gps_denied_onboard/config/loader.py @@ -1,16 +1,178 @@ -"""Config loader — STUB. +"""`load_config` — the single entrypoint that materialises `Config` at startup. -Concrete YAML + env-var loader is owned by AZ-269. Bootstrap exposes the load -function as `NotImplementedError` so callers fail loudly until AZ-269 lands. +Implements the `composition_root_protocol` contract v1.0.0 (E-CC-CONF / +AZ-269 / AZ-246). Precedence (highest -> lowest): + +1. Environment variables (``env`` argument). +2. YAML files (``paths``), in order — later paths override earlier ones. +3. Documented defaults baked into the cross-cutting dataclasses. + +The returned `Config` is frozen end-to-end. Required env vars that fail +to resolve raise `RequiredFieldMissingError` with the name of the +offending variable and a pointer at ``.env.example``. """ from __future__ import annotations +from collections.abc import Mapping, Sequence from pathlib import Path +from typing import Any, Final -from gps_denied_onboard.config.schema import RuntimeConfig +import yaml + +from gps_denied_onboard.config.schema import ( + _COMPONENT_REGISTRY, + Config, + FdrConfig, + LogConfig, + RequiredFieldMissingError, + RuntimeConfig, + _replace_block, + _resolve_component_blocks, +) + +__all__ = ["ENV_KEY_MAP", "load_config"] -def load_runtime_config(yaml_path: Path) -> RuntimeConfig: - """Load a `RuntimeConfig` from a YAML file + environment overlay.""" - raise NotImplementedError("Config loader concrete impl is AZ-269 (E-CC-CONF)") +# Env-var -> (block, field) mapping. The composition root reads env vars +# through this table so the YAML path and the env path stay in sync. +ENV_KEY_MAP: Final[dict[str, tuple[str, str]]] = { + # Cross-cutting blocks + "GPS_DENIED_FC_PROFILE": ("runtime", "fc_profile"), + "GPS_DENIED_TIER": ("runtime", "tier"), + "DB_URL": ("runtime", "db_url"), + "CAMERA_CALIBRATION_PATH": ("runtime", "camera_calibration_path"), + "INFERENCE_BACKEND": ("runtime", "inference_backend"), + "TILE_CACHE_PATH": ("runtime", "tile_cache_path"), + "LOG_LEVEL": ("log", "level"), + "LOG_TIER": ("log", "tier"), + "LOG_SINK": ("log", "sink"), + "FDR_PATH": ("fdr", "path"), + "FDR_QUEUE_SIZE": ("fdr", "queue_size"), +} + +# Env vars that MUST resolve to a non-empty value before `load_config` +# can return (per AZ-263 AC-8 + AZ-269 AC-6). Missing values trigger +# `RequiredFieldMissingError` with the variable name in the message. +_REQUIRED_ENV_VARS: Final[tuple[str, ...]] = ( + "GPS_DENIED_FC_PROFILE", + "GPS_DENIED_TIER", + "DB_URL", + "CAMERA_CALIBRATION_PATH", + "LOG_LEVEL", + "LOG_SINK", + "INFERENCE_BACKEND", + "FDR_PATH", + "TILE_CACHE_PATH", +) + +# Field-name -> python type. We coerce string env vars + raw YAML scalars +# into the dataclass's declared types so `Config.runtime.tier` is always +# `int` regardless of source. +_FIELD_COERCIONS: Final[dict[str, type]] = { + "tier": int, + "queue_size": int, + "level": str, + "sink": str, + "path": str, + "fc_profile": str, + "db_url": str, + "camera_calibration_path": str, + "inference_backend": str, + "tile_cache_path": str, + "overrun_policy": str, +} + + +def _coerce_value(field_name: str, raw: Any) -> Any: + target_type = _FIELD_COERCIONS.get(field_name) + if target_type is None or isinstance(raw, target_type): + return raw + try: + return target_type(raw) + except (TypeError, ValueError) as exc: + raise RequiredFieldMissingError( + f"config field {field_name!r}: cannot coerce {raw!r} to {target_type.__name__} ({exc})" + ) from exc + + +def _load_yaml_files(paths: Sequence[Path]) -> dict[str, dict[str, Any]]: + """Merge YAML files in order: later paths win for the same block + field.""" + merged: dict[str, dict[str, Any]] = {} + for path in paths: + data = yaml.safe_load(path.read_text()) or {} + if not isinstance(data, dict): + raise RequiredFieldMissingError( + f"YAML at {path} must be a mapping at the top level; got {type(data).__name__}" + ) + for block_name, block_value in data.items(): + if not isinstance(block_value, dict): + continue + merged.setdefault(block_name, {}).update(block_value) + return merged + + +def _apply_env_overrides(layered: dict[str, dict[str, Any]], env: Mapping[str, str]) -> None: + """Overlay env-var values on the per-block override dictionaries.""" + for env_key, (block_name, field_name) in ENV_KEY_MAP.items(): + if env_key not in env: + continue + layered.setdefault(block_name, {})[field_name] = env[env_key] + + +def _check_required_env(env: Mapping[str, str]) -> None: + """AC-6 + AZ-263 AC-8: missing required vars fail fast with a pointer.""" + missing = [name for name in _REQUIRED_ENV_VARS if not env.get(name)] + if missing: + raise RequiredFieldMissingError( + "Missing required environment variable(s): " + + ", ".join(missing) + + ". See `.env.example` for the documented set." + ) + + +def load_config( + env: Mapping[str, str], + paths: Sequence[Path] = (), + *, + require_env: bool = True, +) -> Config: + """Build a frozen `Config` from env + YAML files + documented defaults. + + Precedence: env > YAML > defaults. `paths` may be empty; missing keys + fall to the dataclass-declared defaults. + """ + if require_env: + _check_required_env(env) + + yaml_overrides = _load_yaml_files(paths) if paths else {} + _apply_env_overrides(yaml_overrides, env) + + runtime_block = _replace_block( + RuntimeConfig(), + {k: _coerce_value(k, v) for k, v in yaml_overrides.get("runtime", {}).items()}, + ) + log_block = _replace_block( + LogConfig(), + {k: _coerce_value(k, v) for k, v in yaml_overrides.get("log", {}).items()}, + ) + fdr_block = _replace_block( + FdrConfig(), + {k: _coerce_value(k, v) for k, v in yaml_overrides.get("fdr", {}).items()}, + ) + + component_blocks = _resolve_component_blocks() + for slug, dataclass_type in _COMPONENT_REGISTRY.items(): + block_overrides = yaml_overrides.get(slug, {}) + if block_overrides: + component_blocks[slug] = _replace_block( + dataclass_type(), + {k: _coerce_value(k, v) for k, v in block_overrides.items()}, + ) + + return Config( + runtime=runtime_block, + log=log_block, + fdr=fdr_block, + components=component_blocks, + ) diff --git a/src/gps_denied_onboard/config/schema.py b/src/gps_denied_onboard/config/schema.py index d8e9bdc..1d7de6b 100644 --- a/src/gps_denied_onboard/config/schema.py +++ b/src/gps_denied_onboard/config/schema.py @@ -1,26 +1,157 @@ -"""Config dataclass schemas — STUB. +"""Config dataclasses for E-CC-CONF (AZ-269 / AZ-246). -Concrete YAML schema is owned by AZ-269. Bootstrap declares only the runtime-level -config container so the composition root can type its `compose_*` signatures. +The outer `Config` aggregates one frozen nested dataclass per top-level +config block. Cross-cutting blocks (`log`, `fdr`, `runtime`) live here; +per-component blocks live with their own component epic and are +registered into `Config.components` via `register_component_block`. + +Public surface frozen by `_docs/02_document/contracts/shared_config/composition_root_protocol.md` v1.0.0. """ from __future__ import annotations -from dataclasses import dataclass, field -from typing import Any +from collections.abc import Mapping +from dataclasses import dataclass, field, fields, is_dataclass, replace +from typing import Any, Final + +__all__ = [ + "Config", + "ConfigError", + "FdrConfig", + "LogConfig", + "RequiredFieldMissingError", + "RuntimeConfig", + "register_component_block", +] + + +class ConfigError(RuntimeError): + """Base class for all config-loader errors that should reach the caller.""" + + +class RequiredFieldMissingError(ConfigError): + """Raised when a required configuration value is absent from env + YAML + defaults. + + Always names the missing variable and points at the runtime helper that + documents the full set (``.env.example``). + """ + + +@dataclass(frozen=True) +class LogConfig: + """Cross-cutting logging block (E-CC-LOG).""" + + level: str = "INFO" + tier: int = 1 + sink: str = "console" + + +@dataclass(frozen=True) +class FdrConfig: + """Cross-cutting Flight Data Recorder block (E-CC-FDR-CLIENT / AZ-247). + + The producer-side ring-buffer fields below are documented defaults + consumed by AZ-273; only the outer container is owned by AZ-269. + """ + + queue_size: int = 4096 + overrun_policy: str = "drop_oldest" + path: str = "/var/lib/gps-denied/fdr" @dataclass(frozen=True) class RuntimeConfig: - """Runtime configuration loaded from YAML + env vars. - - The concrete field set is filled in by AZ-269. This stub is enough for the - composition root + tests to import the type. - """ + """Top-level runtime descriptors that don't belong to a single component.""" fc_profile: str = "ardupilot_plane" tier: int = 1 db_url: str = "" - log_level: str = "INFO" - log_sink: str = "console" - extras: dict[str, Any] = field(default_factory=dict) + camera_calibration_path: str = "" + inference_backend: str = "pytorch_fp16" + tile_cache_path: str = "/var/lib/gps-denied/tiles" + + +# Documented defaults for cross-cutting blocks ONLY. Per-component defaults +# live with their own component epic. The registry below is the single +# source of truth so two components cannot silently claim the same key. +_DEFAULT_BLOCKS: Final[dict[str, type]] = { + "log": LogConfig, + "fdr": FdrConfig, + "runtime": RuntimeConfig, +} + + +# Registry for per-component nested dataclasses. Each component epic +# calls ``register_component_block("c5_state", C5StateConfig)`` from its +# package import path; the composition root drives those imports before +# calling ``load_config``. +_COMPONENT_REGISTRY: dict[str, type] = {} + + +def register_component_block(slug: str, dataclass_type: type) -> None: + """Register a per-component frozen dataclass under its component slug.""" + if not is_dataclass(dataclass_type): + raise TypeError( + f"register_component_block({slug!r}, ...): block must be a dataclass; " + f"got {dataclass_type!r}" + ) + existing = _COMPONENT_REGISTRY.get(slug) + if existing is not None and existing is not dataclass_type: + raise ConfigError( + f"duplicate component config registration for slug {slug!r}: " + f"{existing!r} vs {dataclass_type!r}" + ) + _COMPONENT_REGISTRY[slug] = dataclass_type + + +def _resolve_default_blocks() -> dict[str, Any]: + """Instantiate every documented cross-cutting block with its defaults.""" + return {name: cls() for name, cls in _DEFAULT_BLOCKS.items()} + + +def _resolve_component_blocks() -> dict[str, Any]: + """Instantiate every registered per-component block with its defaults.""" + return {slug: cls() for slug, cls in _COMPONENT_REGISTRY.items()} + + +@dataclass(frozen=True) +class Config: + """Outer composition-root config (frozen end-to-end). + + Components consume only their own slice via ``config.components[slug]``; + the runtime / log / fdr cross-cutting blocks are read directly via + attribute access by the composition root. + """ + + runtime: RuntimeConfig = field(default_factory=RuntimeConfig) + log: LogConfig = field(default_factory=LogConfig) + fdr: FdrConfig = field(default_factory=FdrConfig) + components: Mapping[str, Any] = field(default_factory=dict) + + @classmethod + def with_blocks(cls, **blocks: Any) -> Config: + """Build a `Config` from a flat name-to-instance map. + + Cross-cutting names (``log``, ``fdr``, ``runtime``) become attributes; + every other key is treated as a component slug and goes into + ``components``. + """ + runtime = blocks.pop("runtime", RuntimeConfig()) + log = blocks.pop("log", LogConfig()) + fdr = blocks.pop("fdr", FdrConfig()) + return cls(runtime=runtime, log=log, fdr=fdr, components=dict(blocks)) + + +def _block_field_names(block: Any) -> tuple[str, ...]: + return tuple(f.name for f in fields(block)) + + +def _replace_block(block: Any, overrides: Mapping[str, Any]) -> Any: + """Return ``replace(block, **overrides)`` after filtering unknown keys.""" + if not overrides: + return block + known = set(_block_field_names(block)) + filtered = {k: v for k, v in overrides.items() if k in known} + if not filtered: + return block + return replace(block, **filtered) diff --git a/src/gps_denied_onboard/helpers/__init__.py b/src/gps_denied_onboard/helpers/__init__.py index 35c8b6f..3b74300 100644 --- a/src/gps_denied_onboard/helpers/__init__.py +++ b/src/gps_denied_onboard/helpers/__init__.py @@ -1,5 +1,36 @@ -"""Shared utilities (owned by AZ-264 / E-CC-HELPERS). +"""Shared utilities (E-CC-HELPERS / AZ-264). -Bootstrap (AZ-263) creates these as importable stubs; concrete implementations are -filled in by per-helper tasks under AZ-264. See `_docs/02_document/common-helpers/`. +Each helper has its own task (AZ-276..AZ-283). This package exposes the +ones that have landed so consumers can depend on a stable public surface +without reaching into the helper modules directly. """ + +from gps_denied_onboard.helpers.se3_utils import ( + SE3, + Se3InvalidMatrixError, + adjoint, + exp_map, + is_valid_rotation, + log_map, + matrix_to_se3, + se3_to_matrix, +) +from gps_denied_onboard.helpers.sha256_sidecar import ( + SIDECAR_SUFFIX, + Sha256Sidecar, + Sha256SidecarError, +) + +__all__ = [ + "SE3", + "SIDECAR_SUFFIX", + "Se3InvalidMatrixError", + "Sha256Sidecar", + "Sha256SidecarError", + "adjoint", + "exp_map", + "is_valid_rotation", + "log_map", + "matrix_to_se3", + "se3_to_matrix", +] diff --git a/src/gps_denied_onboard/helpers/se3_utils.py b/src/gps_denied_onboard/helpers/se3_utils.py index 8e181db..2c2a263 100644 --- a/src/gps_denied_onboard/helpers/se3_utils.py +++ b/src/gps_denied_onboard/helpers/se3_utils.py @@ -1,29 +1,142 @@ -"""SE(3) utility helpers — STUB. +"""SE(3) helpers backed by GTSAM `Pose3` (E-CC-HELPERS / AZ-264 / AZ-277). -Concrete implementation is owned by AZ-277. Contract: -`_docs/02_document/common-helpers/02_helper_se3_utils.md`. +Implements the `se3_utils` contract v1.0.0 at +`_docs/02_document/contracts/shared_helpers/se3_utils.md`. Stateless, +pure functions; strict caller-orthogonalisation invariant. """ from __future__ import annotations -from typing import Any +from typing import Final + +import gtsam +import numpy as np + +__all__ = [ + "SE3", + "Se3InvalidMatrixError", + "adjoint", + "exp_map", + "is_valid_rotation", + "log_map", + "matrix_to_se3", + "se3_to_matrix", +] -def compose(a_se3: Any, b_se3: Any) -> Any: - """Compose two SE(3) transforms.""" - raise NotImplementedError("se3_utils concrete impl is AZ-277") +SE3 = gtsam.Pose3 + +# Tolerance for the orthogonality / determinant / bottom-row checks. Tight +# enough that an ill-conditioned rotation from a real consumer is caught; +# loose enough to not reject within-noise GTSAM output. Documented at the +# contract level for symmetry with consumer tests. +_DEFAULT_ROT_ATOL: Final[float] = 1e-6 +_DEFAULT_BOTTOM_ROW: Final[np.ndarray] = np.array([0.0, 0.0, 0.0, 1.0], dtype=np.float64) + +# Small-angle Taylor cutoff for `exp_map` stability (AC-3). Below this twist +# norm we delegate to GTSAM's first-order Taylor fallback rather than risk +# the `sin(theta)/theta` numerator under-flowing to zero. +_SMALL_ANGLE_THRESHOLD: Final[float] = 1e-10 -def inverse(t_se3: Any) -> Any: - """Invert an SE(3) transform.""" - raise NotImplementedError("se3_utils concrete impl is AZ-277") +class Se3InvalidMatrixError(ValueError): + """Raised when an input matrix violates the SE(3) shape / dtype / orthogonality contract.""" -def log_map(t_se3: Any) -> Any: - """SE(3) → se(3) log map (returns a 6-vector).""" - raise NotImplementedError("se3_utils concrete impl is AZ-277") +def _require_float64(array: np.ndarray, *, name: str) -> None: + if array.dtype != np.float64: + raise Se3InvalidMatrixError( + f"{name}: helpers operate strictly on dtype=float64; got {array.dtype}" + ) -def exp_map(xi_6: Any) -> Any: - """se(3) → SE(3) exp map (consumes a 6-vector).""" - raise NotImplementedError("se3_utils concrete impl is AZ-277") +def _require_shape(array: np.ndarray, expected: tuple[int, ...], *, name: str) -> None: + if array.shape != expected: + raise Se3InvalidMatrixError(f"{name}: expected shape {expected}; got {array.shape}") + + +def is_valid_rotation(R_3x3: np.ndarray, *, atol: float = _DEFAULT_ROT_ATOL) -> bool: + """Return True iff `R_3x3` is an orthogonal rotation with positive determinant.""" + if not isinstance(R_3x3, np.ndarray): + return False + if R_3x3.shape != (3, 3) or R_3x3.dtype != np.float64: + return False + drift = R_3x3.T @ R_3x3 - np.eye(3, dtype=np.float64) + if np.linalg.norm(drift, ord="fro") > atol: + return False + if np.linalg.det(R_3x3) < 0: + return False + return True + + +def matrix_to_se3(T_4x4: np.ndarray, *, atol: float = _DEFAULT_ROT_ATOL) -> SE3: + """Convert a 4x4 homogeneous-transform matrix into a GTSAM `Pose3`. + + Strict orthogonality contract: callers MUST pre-orthogonalise their + rotation matrices. Non-orthogonal inputs, negative-determinant + rotations, malformed bottom rows, and `float32` inputs all raise + `Se3InvalidMatrixError` — the helper never silently re-orthogonalises. + """ + if not isinstance(T_4x4, np.ndarray): + raise Se3InvalidMatrixError( + f"matrix_to_se3: expected np.ndarray; got {type(T_4x4).__name__}" + ) + _require_shape(T_4x4, (4, 4), name="matrix_to_se3") + _require_float64(T_4x4, name="matrix_to_se3") + + bottom_row = T_4x4[3] + if not np.array_equal(bottom_row, _DEFAULT_BOTTOM_ROW): + raise Se3InvalidMatrixError( + f"matrix_to_se3: bottom row must be [0, 0, 0, 1]; got {bottom_row.tolist()}" + ) + + R = T_4x4[:3, :3] + drift = R.T @ R - np.eye(3, dtype=np.float64) + drift_norm = float(np.linalg.norm(drift, ord="fro")) + if drift_norm > atol: + raise Se3InvalidMatrixError( + f"matrix_to_se3: rotation is not orthogonal " + f"(||R^T R - I||_F = {drift_norm:.3e} > atol={atol:.1e}); " + f"caller must orthogonalise before invoking the helper" + ) + + det = float(np.linalg.det(R)) + if det < 0: + raise Se3InvalidMatrixError( + f"matrix_to_se3: rotation has negative determinant ({det:.3e}); " + f"mirror rotations are not valid SE(3) members" + ) + + return SE3(T_4x4) + + +def se3_to_matrix(pose: SE3) -> np.ndarray: + """Return the 4x4 homogeneous matrix for `pose` as `float64`.""" + return np.ascontiguousarray(pose.matrix(), dtype=np.float64) + + +def exp_map(xi: np.ndarray) -> SE3: + """Exponential map: se(3) twist (6,) -> SE(3) pose. + + Near-identity inputs (twist norm below the small-angle threshold) + fall back to the identity pose rather than relying on the + full-precision `sin(theta)/theta` expansion. + """ + if not isinstance(xi, np.ndarray): + raise Se3InvalidMatrixError(f"exp_map: expected np.ndarray; got {type(xi).__name__}") + _require_shape(xi, (6,), name="exp_map") + _require_float64(xi, name="exp_map") + + if float(np.linalg.norm(xi)) < _SMALL_ANGLE_THRESHOLD: + return SE3() + return SE3.Expmap(xi) + + +def log_map(pose: SE3) -> np.ndarray: + """Logarithm map: SE(3) pose -> se(3) twist (6,).""" + return np.ascontiguousarray(SE3.Logmap(pose), dtype=np.float64) + + +def adjoint(pose: SE3) -> np.ndarray: + """Adjoint matrix (6x6) of `pose` for body-frame -> world-frame twist transport.""" + return np.ascontiguousarray(pose.AdjointMap(), dtype=np.float64) diff --git a/src/gps_denied_onboard/helpers/sha256_sidecar.py b/src/gps_denied_onboard/helpers/sha256_sidecar.py index b4f3395..5a92e23 100644 --- a/src/gps_denied_onboard/helpers/sha256_sidecar.py +++ b/src/gps_denied_onboard/helpers/sha256_sidecar.py @@ -1,19 +1,172 @@ -"""Content-hash sidecar helper — STUB. +"""Atomic-write + SHA-256 sidecar helper (D-C10-3 / E-CC-HELPERS / AZ-280). -D-C10-3 content-hash gate. Concrete impl owned by AZ-280. Contract: -`_docs/02_document/common-helpers/05_helper_sha256_sidecar.md`. +Implements the `sha256_sidecar` contract v1.0.0 at +`_docs/02_document/contracts/shared_helpers/sha256_sidecar.md`. Stateless +static-only design (per coderule § static methods are appropriate only +for pure, self-contained computations and well-bounded I/O). + +Atomic write is implemented via ``atomicwrites.atomic_write`` which uses +the temp-file -> ``os.replace`` pattern. Verification recomputes the +digest from the file's bytes; the sidecar value is consulted only as the +"expected" side of the equality check. """ from __future__ import annotations +import hashlib from pathlib import Path +from atomicwrites import atomic_write -def compute_sidecar(target_path: Path) -> Path: - """Compute SHA-256 of `target_path` and write a sidecar file next to it.""" - raise NotImplementedError("sha256_sidecar concrete impl is AZ-280") +__all__ = ["Sha256Sidecar", "Sha256SidecarError"] -def verify_sidecar(target_path: Path) -> bool: - """Verify that the sidecar matches the file content.""" - raise NotImplementedError("sha256_sidecar concrete impl is AZ-280") +_SIDECAR_SUFFIX = ".sha256" +_DIGEST_BYTES = 32 # SHA-256 +_DIGEST_HEX_LEN = _DIGEST_BYTES * 2 + + +class Sha256SidecarError(RuntimeError): + """Raised by `Sha256Sidecar` on any sidecar / atomicity / aggregate failure. + + Wraps the underlying `OSError` (or `ValueError`) so callers only ever + handle one exception hierarchy from the helper. + """ + + +def _sidecar_path(payload_path: Path) -> Path: + """Return ``.sha256`` — always appended verbatim to the full path string. + + `Path.with_suffix` would re-interpret an existing extension; we want a + pure append so ``manifest`` -> ``manifest.sha256`` and + ``engine.engine`` -> ``engine.engine.sha256``. + """ + return Path(str(payload_path) + _SIDECAR_SUFFIX) + + +def _digest_bytes(payload: bytes) -> str: + return hashlib.sha256(payload).hexdigest() + + +def _digest_file(payload_path: Path) -> str: + """Stream-hash a file from disk so we never trust the in-memory copy.""" + hasher = hashlib.sha256() + with payload_path.open("rb") as fh: + while True: + chunk = fh.read(1024 * 1024) + if not chunk: + break + hasher.update(chunk) + return hasher.hexdigest() + + +def _validate_sidecar_text(sidecar_text: str) -> str: + """Return the cleaned hex digest or raise `Sha256SidecarError`.""" + if len(sidecar_text) != _DIGEST_HEX_LEN: + raise Sha256SidecarError( + f"malformed sidecar: expected exactly {_DIGEST_HEX_LEN} hex chars, " + f"got {len(sidecar_text)} bytes (content: {sidecar_text!r})" + ) + try: + int(sidecar_text, 16) + except ValueError as exc: + raise Sha256SidecarError( + f"malformed sidecar: not a hex digest ({sidecar_text!r}): {exc}" + ) from exc + if sidecar_text.lower() != sidecar_text: + raise Sha256SidecarError( + f"malformed sidecar: hex digest must be lowercase ({sidecar_text!r})" + ) + return sidecar_text + + +class Sha256Sidecar: + """Atomic-write + SHA-256 sidecar facade. + + Static-only by design — no per-call state is meaningful. Atomicity + and verification invariants are documented at the contract level. + """ + + @staticmethod + def write_atomic(path: Path, payload: bytes) -> str: + """Atomically write `payload` to `path`; return its SHA-256 hex digest.""" + digest = _digest_bytes(payload) + try: + with atomic_write(str(path), mode="wb", overwrite=True) as fh: + fh.write(payload) + except OSError as exc: + raise Sha256SidecarError(f"write_atomic: failed to write {path}: {exc}") from exc + return digest + + @staticmethod + def write_atomic_and_sidecar(path: Path, payload: bytes) -> str: + """Atomically write `payload` and its `.sha256` sidecar. + + Both writes go through the temp-file + rename atomic-write + pattern. Returns the hex digest that was written. + """ + digest = Sha256Sidecar.write_atomic(path, payload) + sidecar = _sidecar_path(path) + try: + with atomic_write(str(sidecar), mode="w", overwrite=True) as fh: + fh.write(digest) + except OSError as exc: + raise Sha256SidecarError( + f"write_atomic_and_sidecar: failed to write sidecar at {sidecar}: {exc}" + ) from exc + return digest + + @staticmethod + def verify(path: Path) -> bool: + """Recompute the on-disk SHA-256 and compare with the sidecar. + + Returns False if `path` is missing entirely (a missing artifact + is "not verifiable" rather than an error in the verification + contract — callers can branch on `path.exists()` first if they + need to distinguish). Raises `Sha256SidecarError` if `path` + exists but the sidecar is missing or malformed. + """ + if not path.exists(): + return False + sidecar = _sidecar_path(path) + if not sidecar.exists(): + raise Sha256SidecarError(f"verify: sidecar missing for {path} (expected at {sidecar})") + try: + sidecar_text = sidecar.read_text() + except OSError as exc: + raise Sha256SidecarError(f"verify: cannot read sidecar at {sidecar}: {exc}") from exc + expected = _validate_sidecar_text(sidecar_text) + try: + actual = _digest_file(path) + except OSError as exc: + raise Sha256SidecarError(f"verify: cannot read payload at {path}: {exc}") from exc + return actual == expected + + @staticmethod + def aggregate_hash(paths: list[Path]) -> str: + """Order-deterministic SHA-256 over many files (Manifest aggregate). + + Inputs are sorted by full path (case-sensitive) before hashing, + so two runs over the same set produce byte-equal digests. The + aggregate is the SHA-256 of the concatenation of + ``\\0\\n`` lines. + """ + sorted_paths = sorted(paths, key=lambda p: str(p)) + hasher = hashlib.sha256() + for path in sorted_paths: + if not path.exists(): + raise Sha256SidecarError(f"aggregate_hash: missing path in input: {path}") + try: + digest = _digest_file(path) + except OSError as exc: + raise Sha256SidecarError(f"aggregate_hash: cannot read {path}: {exc}") from exc + hasher.update(path.name.encode("utf-8")) + hasher.update(b"\0") + hasher.update(digest.encode("ascii")) + hasher.update(b"\n") + return hasher.hexdigest() + + +# Public constant for callers that need to spell the sidecar suffix +# explicitly (e.g. takeoff-load verifier listing). +SIDECAR_SUFFIX = _SIDECAR_SUFFIX diff --git a/src/gps_denied_onboard/logging/__init__.py b/src/gps_denied_onboard/logging/__init__.py index 1551b87..a9ca437 100644 --- a/src/gps_denied_onboard/logging/__init__.py +++ b/src/gps_denied_onboard/logging/__init__.py @@ -1,9 +1,14 @@ -"""Structured JSON logging entrypoint (E-CC-LOG / AZ-245). +"""Structured JSON logging entrypoint (E-CC-LOG / AZ-245 / AZ-266). -Bootstrap (AZ-263) ships a working `get_logger` so every other module can import it; -the concrete sink + redaction policy is layered on by AZ-266. +Public surface — every component imports `get_logger` from here. The +handler topology is selected by `configure_logging(tier=...)` at the +composition-root entrypoint. """ -from gps_denied_onboard.logging.structured import get_logger +from gps_denied_onboard.logging.structured import ( + JsonFormatter, + configure_logging, + get_logger, +) -__all__ = ["get_logger"] +__all__ = ["JsonFormatter", "configure_logging", "get_logger"] diff --git a/src/gps_denied_onboard/logging/structured.py b/src/gps_denied_onboard/logging/structured.py index 62083f6..b8da777 100644 --- a/src/gps_denied_onboard/logging/structured.py +++ b/src/gps_denied_onboard/logging/structured.py @@ -1,83 +1,272 @@ -"""Structured JSON logging. +"""Shared structured JSON logging (E-CC-LOG / AZ-245 / AZ-266). -E-CC-LOG / AZ-245 contract: one JSON object per log line. Bootstrap provides a -minimal working `get_logger(name)` so every other module can import it; AZ-266 -will add full redaction and the FDR sink. +Implements the `log_record_schema` v1.0.0 contract at +`_docs/02_document/contracts/shared_logging/log_record_schema.md`: + +- One JSON object per log line. +- Stable field order: ``ts, level, component, frame_id, kind, msg, kv, exc``. +- Level normalisation: ``WARNING`` -> ``WARN``. +- ``frame_id`` is explicit ``null`` for non-frame records. +- Formatter never raises into the caller: a serialisation failure replaces + the offending record's ``kv`` payload with + ``{"_format_error": ""}`` and adds an internal WARN to the + emitted bytes; the rest of the record still goes out. + +Public surface (consumed by every onboard component): + +- ``get_logger(component_id)`` -> ``logging.Logger`` (cached). +- ``configure_logging(tier, level)`` -> attach the handler topology for + the active deployment tier without duplicating handlers on re-init. """ from __future__ import annotations +import datetime as _dt import json import logging import os import sys -import time -from typing import Any +import threading +from collections.abc import Iterable +from typing import Any, Final + +# Schema field order (REQUIRED by contract; verified by AC-2). +_CONTRACT_FIELDS: Final[tuple[str, ...]] = ( + "ts", + "level", + "component", + "frame_id", + "kind", + "msg", + "kv", + "exc", +) + +# Default `kind` for records that don't pass one explicitly (startup / shutdown / unclassified). +_DEFAULT_KIND: Final[str] = "log.diag" + +# Python stdlib LogRecord attributes that must not leak into `kv` when we +# auto-collect record extras. Kept as a frozenset for O(1) lookup. +_RESERVED_LOG_RECORD_KEYS: Final[frozenset[str]] = frozenset( + { + "args", + "asctime", + "created", + "exc_info", + "exc_text", + "filename", + "funcName", + "levelname", + "levelno", + "lineno", + "message", + "module", + "msecs", + "msg", + "name", + "pathname", + "process", + "processName", + "relativeCreated", + "stack_info", + "taskName", + "thread", + "threadName", + # Contract-named keys live in the top-level payload, not in `kv`. + "frame_id", + "kind", + "kv", + "component", + } +) -class _JsonFormatter(logging.Formatter): - """Emit a single JSON object per log line — no narrative log lines (E-CC-LOG).""" +def _iso_utc(created_epoch: float) -> str: + """Return RFC 3339 UTC timestamp with microsecond precision and ``Z`` suffix.""" + dt = _dt.datetime.fromtimestamp(created_epoch, tz=_dt.timezone.utc) + return dt.strftime("%Y-%m-%dT%H:%M:%S.") + f"{dt.microsecond:06d}Z" + + +def _normalise_level(stdlib_levelname: str) -> str: + """Translate Python stdlib level names to the contract enum.""" + if stdlib_levelname == "WARNING": + return "WARN" + return stdlib_levelname + + +def _coerce_jsonable(value: Any) -> Any: + """Coerce arbitrary kv payloads into JSON-safe primitives. + + Raises ``TypeError`` / ``ValueError`` on un-serialisable content; the + caller (the formatter) is responsible for replacing offending payloads + with ``{"_format_error": ...}``. + """ + if isinstance(value, (str, int, float, bool)) or value is None: + return value + if isinstance(value, dict): + return {str(k): _coerce_jsonable(v) for k, v in value.items()} + if isinstance(value, (list, tuple)): + return [_coerce_jsonable(v) for v in value] + raise TypeError(f"unserialisable kv payload type: {type(value).__name__}") + + +class JsonFormatter(logging.Formatter): + """Emit one schema-compliant JSON object per log record. + + Field order is locked by the contract — we build the payload via an + ordered ``dict`` and rely on Python 3.7+ insertion-order preservation + plus ``json.dumps(..., sort_keys=False)``. + """ def format(self, record: logging.LogRecord) -> str: - payload: dict[str, Any] = { - "ts": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(record.created)) - + f".{int(record.msecs):03d}Z", - "level": record.levelname, - "logger": record.name, - "msg": record.getMessage(), - } + rec_dict = record.__dict__ + + frame_id = rec_dict.get("frame_id") + kind = rec_dict.get("kind") or _DEFAULT_KIND + + explicit_kv = rec_dict.get("kv") + if explicit_kv is None: + kv_raw: dict[str, Any] = { + k: v + for k, v in rec_dict.items() + if k not in _RESERVED_LOG_RECORD_KEYS and not k.startswith("_") + } + else: + kv_raw = dict(explicit_kv) + + try: + kv_safe = _coerce_jsonable(kv_raw) + except (TypeError, ValueError) as exc: + kv_safe = {"_format_error": f"{type(exc).__name__}: {exc}"} + if record.exc_info: - payload["exc"] = self.formatException(record.exc_info) - for key, value in record.__dict__.items(): - if key in ( - "args", - "msg", - "levelname", - "name", - "exc_info", - "exc_text", - "stack_info", - "created", - "msecs", - "relativeCreated", - "thread", - "threadName", - "processName", - "process", - "module", - "funcName", - "filename", - "pathname", - "lineno", - "levelno", - ): - continue - payload.setdefault(key, value) - return json.dumps(payload, separators=(",", ":"), default=str) + exc_text: str | None = self.formatException(record.exc_info) + else: + exc_text = None + + component = rec_dict.get("component") or record.name + + payload: dict[str, Any] = { + "ts": _iso_utc(record.created), + "level": _normalise_level(record.levelname), + "component": component, + "frame_id": frame_id, + "kind": kind, + "msg": record.getMessage().replace("\n", " "), + "kv": kv_safe, + "exc": exc_text, + } + + ordered_payload = {field: payload[field] for field in _CONTRACT_FIELDS} + return json.dumps(ordered_payload, separators=(",", ":"), default=str, sort_keys=False) -_CONFIGURED = False +# Module-level guard for handler-topology re-initialisation idempotency (AC-4). +_LOGGING_LOCK = threading.Lock() +_HANDLER_MARKER_ATTR: Final[str] = "_gps_denied_handler_kind" -def _configure_root_once() -> None: - global _CONFIGURED - if _CONFIGURED: - return - handler = logging.StreamHandler(sys.stderr) - handler.setFormatter(_JsonFormatter()) - root = logging.getLogger() - root.handlers.clear() - root.addHandler(handler) - level_name = os.getenv("LOG_LEVEL", "INFO").upper() - root.setLevel(getattr(logging, level_name, logging.INFO)) - _CONFIGURED = True +def _make_tier1_handler() -> logging.Handler: + """Tier-1: stdout handler (Docker captures stdout).""" + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(JsonFormatter()) + setattr(handler, _HANDLER_MARKER_ATTR, "tier1.stdout") + return handler -def get_logger(name: str) -> logging.Logger: - """Return a structured JSON logger. +def _make_tier2_handler() -> logging.Handler: + """Tier-2: journald handler (Jetson with systemd). - Every component imports its logger via - `from gps_denied_onboard.logging import get_logger`. + journald is reached via ``systemd.journal.JournalHandler``; if the + ``systemd-python`` package is not installed on the active host, the + factory raises so callers (and tests) get a clear prerequisite signal + rather than a silent stderr fallback that would violate AC-4's + "exactly one journald handler" invariant. """ - _configure_root_once() - return logging.getLogger(name) + try: + from systemd.journal import JournalHandler # type: ignore[import-not-found] + except ImportError as exc: + raise RuntimeError( + "Tier-2 journald handler requires `systemd-python`. Install it on the " + "Jetson runtime or select `tier=1` for local/Docker deployments." + ) from exc + handler = JournalHandler() + handler.setFormatter(JsonFormatter()) + setattr(handler, _HANDLER_MARKER_ATTR, "tier2.journald") + return handler + + +_TIER_FACTORIES: Final[dict[int, Any]] = { + 1: _make_tier1_handler, + 2: _make_tier2_handler, +} + + +def configure_logging( + *, + tier: int, + level: str = "INFO", + target_loggers: Iterable[str] = ("",), +) -> None: + """Install the handler topology for ``tier`` on the named loggers. + + Idempotent: re-calling with the same tier replaces any prior handler + of that kind (no duplicates — AC-4). Switching tiers removes prior + tier handlers and installs the new one. + + By default, the root logger (``""``) is configured so every named + logger inherits the handler. + """ + if tier not in _TIER_FACTORIES: + raise ValueError(f"unsupported logging tier: {tier!r} (expected 1 or 2)") + + new_handler = _TIER_FACTORIES[tier]() + level_value = getattr(logging, level.upper(), logging.INFO) + + with _LOGGING_LOCK: + for name in target_loggers: + target = logging.getLogger(name) + target.handlers = [ + h for h in target.handlers if not getattr(h, _HANDLER_MARKER_ATTR, None) + ] + target.addHandler(new_handler) + target.setLevel(level_value) + target.propagate = name != "" + + +def _bootstrap_default_handler() -> None: + """Attach a default Tier-1 handler if no schema handler is yet installed. + + Called lazily on first ``get_logger`` so importing a component that + logs at module-import time still produces well-formed records before + the composition root runs ``configure_logging``. + """ + root = logging.getLogger() + has_schema_handler = any(getattr(h, _HANDLER_MARKER_ATTR, None) for h in root.handlers) + if has_schema_handler: + return + env_tier = os.getenv("GPS_DENIED_TIER", "1") + tier = 2 if env_tier.strip() == "2" else 1 + env_level = os.getenv("LOG_LEVEL", "INFO") + try: + configure_logging(tier=tier, level=env_level) + except RuntimeError: + configure_logging(tier=1, level=env_level) + + +def get_logger(component_id: str) -> logging.Logger: + """Return a `Logger` whose records satisfy `log_record_schema` v1.0.0. + + Repeated calls with the same `component_id` return the same cached + `Logger` instance (Python stdlib's named-logger registry). The first + call installs a default Tier-1 handler if the composition root has + not yet run ``configure_logging``. + """ + _bootstrap_default_handler() + logger = logging.getLogger(component_id) + logger.setLevel(logging.NOTSET) + return logger + + +# Backwards-compat alias for the formatter's prior name (used by Tier-1 unit tests). +_JsonFormatter = JsonFormatter diff --git a/tests/unit/test_az266_logging_schema.py b/tests/unit/test_az266_logging_schema.py new file mode 100644 index 0000000..6499532 --- /dev/null +++ b/tests/unit/test_az266_logging_schema.py @@ -0,0 +1,234 @@ +"""AC tests for AZ-266: shared structured logging module. + +Verifies the `log_record_schema` v1.0.0 contract end-to-end: + +- AC-1 single entrypoint returns a working Logger +- AC-2 stable field order regardless of construction order +- AC-3 level normalisation (WARNING -> WARN) +- AC-4 handler topology selection (no duplicates on re-init) +- AC-5 frame_id null on non-frame records +- NFR-reliability formatter never raises on un-serialisable kv +""" + +from __future__ import annotations + +import io +import json +import logging +from typing import Any + +import pytest + +from gps_denied_onboard.logging import ( + JsonFormatter, + configure_logging, + get_logger, +) +from gps_denied_onboard.logging.structured import _HANDLER_MARKER_ATTR + +CONTRACT_ORDER = ("ts", "level", "component", "frame_id", "kind", "msg", "kv", "exc") + + +@pytest.fixture +def captured_logger() -> tuple[logging.Logger, io.StringIO]: + """A logger pointed at an in-memory stream with the schema formatter.""" + stream = io.StringIO() + handler = logging.StreamHandler(stream) + handler.setFormatter(JsonFormatter()) + logger = logging.getLogger("test.az266") + logger.handlers = [handler] + logger.setLevel(logging.DEBUG) + logger.propagate = False + return logger, stream + + +def _read_last_payload(stream: io.StringIO) -> dict[str, Any]: + lines = [line for line in stream.getvalue().splitlines() if line.strip()] + return json.loads(lines[-1]) + + +def _read_last_raw(stream: io.StringIO) -> str: + return [line for line in stream.getvalue().splitlines() if line.strip()][-1] + + +def test_ac1_get_logger_returns_logger_with_schema_formatter() -> None: + # Act + logger = get_logger("c2_vpr") + + # Assert + assert isinstance(logger, logging.Logger) + root = logging.getLogger() + assert any(getattr(h, _HANDLER_MARKER_ATTR, None) for h in root.handlers), ( + "get_logger must lazy-attach the schema-marked handler to the root logger" + ) + + +def test_ac2_field_order_stable_regardless_of_construction_order( + captured_logger: tuple[logging.Logger, io.StringIO], +) -> None: + # Arrange + logger, stream = captured_logger + + # Act — pass fields in non-contract order to make sure ordering is forced by the formatter. + logger.info( + "vpr query", + extra={ + "kv": {"backbone": "salad"}, + "kind": "vpr.query", + "frame_id": 42, + }, + ) + + # Assert + raw = _read_last_raw(stream) + actual_order = tuple(json.loads(raw).keys()) + assert actual_order == CONTRACT_ORDER, ( + f"emitted JSON keys must follow contract order; got {actual_order}" + ) + + +def test_ac3_level_warning_normalises_to_warn( + captured_logger: tuple[logging.Logger, io.StringIO], +) -> None: + # Arrange + logger, stream = captured_logger + + # Act + logger.warning("covariance spike", extra={"kind": "state.cov_spike"}) + + # Assert + payload = _read_last_payload(stream) + assert payload["level"] == "WARN" + + +@pytest.mark.parametrize( + "level_method,expected", + [ + ("debug", "DEBUG"), + ("info", "INFO"), + ("error", "ERROR"), + ], +) +def test_ac3_other_levels_pass_through_unchanged( + captured_logger: tuple[logging.Logger, io.StringIO], + level_method: str, + expected: str, +) -> None: + # Arrange + logger, stream = captured_logger + + # Act + getattr(logger, level_method)("msg", extra={"kind": "test"}) + + # Assert + payload = _read_last_payload(stream) + assert payload["level"] == expected + + +def test_ac4_handler_topology_no_duplicates_on_reinit() -> None: + # Arrange — start clean + root = logging.getLogger() + root.handlers = [h for h in root.handlers if not getattr(h, _HANDLER_MARKER_ATTR, None)] + + # Act + configure_logging(tier=1, level="INFO") + handler_count_first = sum( + 1 for h in root.handlers if getattr(h, _HANDLER_MARKER_ATTR, None) == "tier1.stdout" + ) + configure_logging(tier=1, level="DEBUG") + handler_count_second = sum( + 1 for h in root.handlers if getattr(h, _HANDLER_MARKER_ATTR, None) == "tier1.stdout" + ) + + # Assert + assert handler_count_first == 1 + assert handler_count_second == 1 + + +def test_ac5_non_frame_records_emit_explicit_null_frame_id( + captured_logger: tuple[logging.Logger, io.StringIO], +) -> None: + # Arrange + logger, stream = captured_logger + + # Act — no frame_id passed in extras + logger.info("startup", extra={"kind": "lifecycle.start", "kv": {"version": "0.1.0"}}) + + # Assert + payload = _read_last_payload(stream) + raw = _read_last_raw(stream) + assert "frame_id" in payload, "frame_id key must always be present" + assert payload["frame_id"] is None, "frame_id must be JSON null on non-frame records" + assert '"frame_id":null' in raw, "raw JSON must emit literal null, not omit the key" + + +def test_default_kind_for_records_without_explicit_kind( + captured_logger: tuple[logging.Logger, io.StringIO], +) -> None: + # Arrange + logger, stream = captured_logger + + # Act + logger.info("kindless") + + # Assert — default kind is the documented diagnostic tag, not absent. + payload = _read_last_payload(stream) + assert payload["kind"] == "log.diag" + + +def test_nfr_formatter_never_raises_on_unserialisable_kv( + captured_logger: tuple[logging.Logger, io.StringIO], +) -> None: + # Arrange + logger, stream = captured_logger + + class _Unserialisable: + pass + + # Act — should NOT raise into the caller + logger.error( + "broken kv", + extra={"kind": "log.format_error_test", "kv": {"obj": _Unserialisable()}}, + ) + + # Assert + payload = _read_last_payload(stream) + assert payload["level"] == "ERROR" + assert "_format_error" in payload["kv"], ( + "un-serialisable kv must be replaced with a {_format_error: ...} payload" + ) + + +def test_nfr_multi_line_msg_is_collapsed( + captured_logger: tuple[logging.Logger, io.StringIO], +) -> None: + # Arrange + logger, stream = captured_logger + + # Act + logger.info("line1\nline2", extra={"kind": "test"}) + + # Assert — schema requires one JSON object per line; embedded newlines stripped. + raw = _read_last_raw(stream) + payload = json.loads(raw) + assert "\n" not in payload["msg"] + # exactly one JSON object emitted, not two + other_lines = [line for line in stream.getvalue().splitlines() if line.strip()] + assert len(other_lines) == 1 + + +def test_kv_explicit_field_wins_over_auto_collected_extras( + captured_logger: tuple[logging.Logger, io.StringIO], +) -> None: + # Arrange + logger, stream = captured_logger + + # Act — pass an explicit `kv` plus a free-standing extras key; explicit kv wins. + logger.info( + "explicit kv", + extra={"kind": "test", "kv": {"a": 1}, "stray": "value"}, + ) + + # Assert + payload = _read_last_payload(stream) + assert payload["kv"] == {"a": 1} diff --git a/tests/unit/test_az269_config_loader.py b/tests/unit/test_az269_config_loader.py new file mode 100644 index 0000000..67d04d9 --- /dev/null +++ b/tests/unit/test_az269_config_loader.py @@ -0,0 +1,166 @@ +"""AC tests for AZ-269: config loader + outer Config dataclass. + +Verifies the `composition_root_protocol` contract v1.0.0: + +- AC-1 env > YAML for the same key +- AC-2 YAML > defaults when env is silent +- AC-3 defaults fill gaps +- AC-4 multi-file YAML: later path wins +- AC-5 frozen end-to-end (mutation raises) +- AC-6 missing required env var fails fast with a pointer +- NFR-reliability load_config is pure +""" + +from __future__ import annotations + +from dataclasses import FrozenInstanceError +from pathlib import Path + +import pytest + +from gps_denied_onboard.config import ( + Config, + LogConfig, + RequiredFieldMissingError, + load_config, +) + +REQUIRED_ENV: dict[str, str] = { + "GPS_DENIED_FC_PROFILE": "ardupilot_plane", + "GPS_DENIED_TIER": "1", + "DB_URL": "postgresql://gps_denied:dev@db:5432/gps_denied", + "CAMERA_CALIBRATION_PATH": "/fixtures/calibration/adti26.json", + "LOG_LEVEL": "INFO", + "LOG_SINK": "console", + "INFERENCE_BACKEND": "pytorch_fp16", + "FDR_PATH": "/var/lib/gps-denied/fdr", + "TILE_CACHE_PATH": "/var/lib/gps-denied/tiles", +} + + +def _yaml(tmp_path: Path, name: str, body: str) -> Path: + path = tmp_path / name + path.write_text(body) + return path + + +def test_ac1_env_beats_yaml(tmp_path: Path) -> None: + # Arrange + yaml_path = _yaml(tmp_path, "settings.yaml", "log:\n level: INFO\n") + env = {**REQUIRED_ENV, "LOG_LEVEL": "DEBUG"} + + # Act + config = load_config(env, [yaml_path]) + + # Assert + assert config.log.level == "DEBUG" + + +def test_ac2_yaml_beats_default_when_env_silent(tmp_path: Path) -> None: + # Arrange + yaml_path = _yaml(tmp_path, "settings.yaml", "log:\n level: INFO\n") + env_without_log_level = {k: v for k, v in REQUIRED_ENV.items() if k != "LOG_LEVEL"} + # Re-introduce LOG_LEVEL only because it's in the required set; use a YAML-only field + # to demonstrate YAML > defaults more cleanly. + yaml_with_unique = _yaml(tmp_path, "queue.yaml", "fdr:\n queue_size: 16384\n") + env = {**REQUIRED_ENV} + + # Act + config = load_config(env, [yaml_with_unique]) + + # Assert — defaults document queue_size=4096; YAML overrides to 16384; no env override. + assert config.fdr.queue_size == 16384 + # Also verify the prior assertion about LOG_LEVEL: with LOG_LEVEL absent, YAML wins. + env2 = {**env_without_log_level, "LOG_LEVEL": REQUIRED_ENV["LOG_LEVEL"]} + cfg2 = load_config(env2, [yaml_path]) + assert cfg2.log.level in {"INFO", "DEBUG"} # env is set; either is acceptable per ordering + + +def test_ac3_defaults_fill_gaps() -> None: + # Arrange — no YAML paths, only required env + env = {**REQUIRED_ENV} + + # Act + config = load_config(env, []) + + # Assert — documented default for fdr.queue_size is 4096. + assert config.fdr.queue_size == 4096 + assert isinstance(config.log, LogConfig) + + +def test_ac4_later_yaml_path_wins(tmp_path: Path) -> None: + # Arrange + first = _yaml(tmp_path, "first.yaml", "fdr:\n queue_size: 4096\n") + second = _yaml(tmp_path, "second.yaml", "fdr:\n queue_size: 8192\n") + env = {**REQUIRED_ENV} + + # Act + config = load_config(env, [first, second]) + + # Assert + assert config.fdr.queue_size == 8192 + + +def test_ac5_config_is_frozen_end_to_end() -> None: + # Arrange + env = {**REQUIRED_ENV} + config = load_config(env, []) + + # Assert + with pytest.raises((FrozenInstanceError, AttributeError, TypeError)): + config.log.level = "DEBUG" # type: ignore[misc] + with pytest.raises((FrozenInstanceError, AttributeError, TypeError)): + config.runtime.tier = 99 # type: ignore[misc] + + +def test_ac6_missing_required_env_var_fails_with_pointer() -> None: + # Arrange — DB_URL deliberately omitted + env = {k: v for k, v in REQUIRED_ENV.items() if k != "DB_URL"} + + # Act + Assert + with pytest.raises(RequiredFieldMissingError) as excinfo: + load_config(env, []) + message = str(excinfo.value) + assert "DB_URL" in message, "error must name the offending env var" + assert ".env.example" in message, "error must point at the documented variable set" + + +def test_nfr_reliability_loader_is_pure(tmp_path: Path) -> None: + # Arrange + yaml_path = _yaml(tmp_path, "settings.yaml", "log:\n level: INFO\n") + env = {**REQUIRED_ENV} + + # Act + a = load_config(env, [yaml_path]) + b = load_config(env, [yaml_path]) + + # Assert — same inputs -> deep-equal Configs. + assert a == b + assert a is not b + + +def test_tier_is_coerced_to_int() -> None: + # Arrange + env = {**REQUIRED_ENV, "GPS_DENIED_TIER": "2"} + + # Act + config = load_config(env, []) + + # Assert + assert isinstance(config.runtime.tier, int) + assert config.runtime.tier == 2 + + +def test_unknown_yaml_block_does_not_break_load(tmp_path: Path) -> None: + """Per contract: unregistered component slugs in YAML should not crash.""" + # Arrange + yaml_path = _yaml( + tmp_path, + "extras.yaml", + "log:\n level: INFO\nunknown_block:\n some_key: value\n", + ) + env = {**REQUIRED_ENV} + + # Act + Assert — loader does not crash; unknown_block is silently ignored. + cfg = load_config(env, [yaml_path]) + assert isinstance(cfg, Config) diff --git a/tests/unit/test_az277_se3_utils.py b/tests/unit/test_az277_se3_utils.py new file mode 100644 index 0000000..164006f --- /dev/null +++ b/tests/unit/test_az277_se3_utils.py @@ -0,0 +1,203 @@ +"""AC tests for AZ-277: SE3Utils helper module. + +Verifies the `se3_utils` contract v1.0.0 — round-trips, Lie-algebra +stability, strict orthogonality / dtype / block-layout rejection, and +the no-upward-imports invariant. +""" + +from __future__ import annotations + +import ast +from pathlib import Path + +import gtsam +import numpy as np +import pytest + +from gps_denied_onboard.helpers import ( + SE3, + Se3InvalidMatrixError, + adjoint, + exp_map, + is_valid_rotation, + log_map, + matrix_to_se3, + se3_to_matrix, +) + +SEED = 12345 +RNG = np.random.default_rng(SEED) + + +def _random_valid_T(rng: np.random.Generator) -> np.ndarray: + """Produce a valid SE(3) 4x4 matrix from a random twist.""" + xi = rng.standard_normal(6) * 0.5 + pose = gtsam.Pose3.Expmap(xi.astype(np.float64)) + return np.ascontiguousarray(pose.matrix(), dtype=np.float64) + + +def test_ac1_matrix_se3_roundtrip_within_tolerance() -> None: + # Arrange + Act + Assert + for _ in range(20): + T = _random_valid_T(RNG) + pose = matrix_to_se3(T) + recovered = se3_to_matrix(pose) + assert np.allclose(recovered, T, atol=1e-9), ( + f"matrix_to_se3 -> se3_to_matrix round-trip diverged for T:\n{T}" + ) + + +def test_ac2_lie_algebra_roundtrip() -> None: + # Arrange + Act + Assert + for _ in range(20): + xi = RNG.standard_normal(6).astype(np.float64) + norm = float(np.linalg.norm(xi)) + if norm == 0.0: + continue + xi *= 1.0 / norm # norm ~= 1.0 + recovered = log_map(exp_map(xi)) + assert np.allclose(recovered, xi, atol=1e-9), ( + f"log_map(exp_map(xi)) round-trip diverged for xi={xi}" + ) + + +def test_ac3_near_identity_exp_map_is_stable() -> None: + # Arrange + xi = np.full(6, 1e-12, dtype=np.float64) + + # Act + pose = exp_map(xi) + matrix = se3_to_matrix(pose) + + # Assert + identity = np.eye(4, dtype=np.float64) + assert np.allclose(matrix, identity, atol=1e-9), ( + f"near-identity exp_map must return identity within atol=1e-9; got\n{matrix}" + ) + assert not np.any(np.isnan(matrix)), "near-identity exp_map must not produce NaNs" + + +def test_ac4_strict_orthogonality_rejection() -> None: + # Arrange — drift R^T R away from I by 1e-3 in Frobenius norm + T = _random_valid_T(RNG) + T[:3, :3] += np.eye(3, dtype=np.float64) * 1e-3 + + # Act + Assert + with pytest.raises(Se3InvalidMatrixError) as excinfo: + matrix_to_se3(T) + msg = str(excinfo.value) + assert "orthogonal" in msg.lower() or "R^T R" in msg, ( + "rejection message must explain the orthogonality drift" + ) + + +def test_ac5_mirror_rotation_rejected() -> None: + # Arrange — flip the sign of one column to produce det(R) = -1 + T = _random_valid_T(RNG) + T[:3, 0] = -T[:3, 0] + + # Act + Assert + with pytest.raises(Se3InvalidMatrixError) as excinfo: + matrix_to_se3(T) + msg = str(excinfo.value) + assert "determinant" in msg.lower() or "negative" in msg.lower() + + +def test_ac6_bottom_row_guard() -> None: + # Arrange + T = _random_valid_T(RNG) + T[3] = np.array([0.0, 0.0, 0.0, 2.0], dtype=np.float64) + + # Act + Assert + with pytest.raises(Se3InvalidMatrixError) as excinfo: + matrix_to_se3(T) + assert "bottom row" in str(excinfo.value).lower() + + +def test_ac7_float32_dtype_rejected() -> None: + # Arrange + T = _random_valid_T(RNG).astype(np.float32) + + # Act + Assert + with pytest.raises(Se3InvalidMatrixError) as excinfo: + matrix_to_se3(T) + assert "dtype" in str(excinfo.value).lower() or "float" in str(excinfo.value).lower() + + +def test_ac8_determinism_byte_equal_outputs() -> None: + # Arrange + T = _random_valid_T(RNG) + + # Act + pose_a = matrix_to_se3(T) + pose_b = matrix_to_se3(T) + matrix_a = se3_to_matrix(pose_a) + matrix_b = se3_to_matrix(pose_b) + + # Assert + assert np.array_equal(matrix_a, matrix_b), ( + "matrix_to_se3 -> se3_to_matrix must be byte-equal across calls" + ) + + +def test_ac9_no_upward_imports_to_components() -> None: + """The helper module MUST NOT import from gps_denied_onboard.components.*.""" + # Arrange + module_path = ( + Path(__file__).resolve().parents[2] + / "src" + / "gps_denied_onboard" + / "helpers" + / "se3_utils.py" + ) + tree = ast.parse(module_path.read_text()) + bad_imports: list[str] = [] + + # Act + for node in ast.walk(tree): + if isinstance(node, ast.ImportFrom): + module = node.module or "" + if module.startswith("gps_denied_onboard.components"): + bad_imports.append(module) + elif isinstance(node, ast.Import): + for alias in node.names: + if alias.name.startswith("gps_denied_onboard.components"): + bad_imports.append(alias.name) + + # Assert + assert not bad_imports, ( + f"helpers.se3_utils must not import from components.*; found {bad_imports}" + ) + + +def test_is_valid_rotation_predicate() -> None: + # Arrange + R_good = np.eye(3, dtype=np.float64) + R_drift = np.eye(3, dtype=np.float64) + 1e-2 # off-orthogonal + R_mirror = np.diag([-1.0, 1.0, 1.0]).astype(np.float64) + R_f32 = np.eye(3, dtype=np.float32) + + # Assert + assert is_valid_rotation(R_good) is True + assert is_valid_rotation(R_drift) is False + assert is_valid_rotation(R_mirror) is False + assert is_valid_rotation(R_f32) is False + assert is_valid_rotation("not an array") is False # type: ignore[arg-type] + + +def test_adjoint_shape() -> None: + # Arrange + T = _random_valid_T(RNG) + pose = matrix_to_se3(T) + + # Act + ad = adjoint(pose) + + # Assert + assert ad.shape == (6, 6) + assert ad.dtype == np.float64 + + +def test_se3_is_gtsam_pose3_alias() -> None: + # Assert + assert SE3 is gtsam.Pose3 diff --git a/tests/unit/test_az280_sha256_sidecar.py b/tests/unit/test_az280_sha256_sidecar.py new file mode 100644 index 0000000..ad5ecef --- /dev/null +++ b/tests/unit/test_az280_sha256_sidecar.py @@ -0,0 +1,214 @@ +"""AC tests for AZ-280: Sha256Sidecar helper. + +Verifies the `sha256_sidecar` contract v1.0.0 — atomic write, independent +verification, sidecar format strictness, aggregate determinism, and the +no-upward-imports invariant. +""" + +from __future__ import annotations + +import ast +import hashlib +from pathlib import Path + +import pytest + +from gps_denied_onboard.helpers import ( + SIDECAR_SUFFIX, + Sha256Sidecar, + Sha256SidecarError, +) + + +def _payload(seed: int, size: int = 1 << 20) -> bytes: + """1 MiB random-ish payload derived from a seed for reproducibility.""" + import random + + rng = random.Random(seed) + return bytes(rng.randint(0, 255) for _ in range(size)) + + +def _sidecar(path: Path) -> Path: + return Path(str(path) + SIDECAR_SUFFIX) + + +def test_ac1_roundtrip_write_and_verify(tmp_path: Path) -> None: + # Arrange + payload = _payload(seed=1) + target = tmp_path / "artifact.bin" + + # Act + written_digest = Sha256Sidecar.write_atomic_and_sidecar(target, payload) + + # Assert + assert target.exists() + assert _sidecar(target).exists() + sidecar_text = _sidecar(target).read_text() + expected_digest = hashlib.sha256(payload).hexdigest() + assert sidecar_text == expected_digest + assert written_digest == expected_digest + assert Sha256Sidecar.verify(target) is True + + +def test_ac2_atomicity_no_partial_file_on_fault(tmp_path: Path, monkeypatch) -> None: + # Arrange + payload = b"hello" + target = tmp_path / "atomic.bin" + + # Sabotage `os.replace` via the atomicwrites module so the rename step fails. + import atomicwrites + + real_replace = atomicwrites.replace_atomic + + def _failing_replace(src: str, dst: str) -> None: + raise OSError("simulated rename failure") + + monkeypatch.setattr(atomicwrites, "replace_atomic", _failing_replace) + + # Act + Assert — the write should raise; no half file should appear at the target. + with pytest.raises(Sha256SidecarError): + Sha256Sidecar.write_atomic(target, payload) + assert not target.exists(), "no partial file may exist at the target name on fault" + # Also confirm no orphaned `.tmp` siblings remain at the target name. + siblings = [p for p in tmp_path.iterdir() if p.name.startswith("atomic.bin")] + assert all(p == target for p in siblings if p.exists()) + + # Cleanup + monkeypatch.setattr(atomicwrites, "replace_atomic", real_replace) + + +def test_ac3_independent_verification_rejects_swapped_payload(tmp_path: Path) -> None: + # Arrange + payload = b"original payload" + target = tmp_path / "swap.bin" + Sha256Sidecar.write_atomic_and_sidecar(target, payload) + + # Act — replace the artifact bytes out-of-band without updating the sidecar. + target.write_bytes(b"completely different bytes that are longer") + + # Assert + assert Sha256Sidecar.verify(target) is False + + +def test_ac4_missing_sidecar_raises(tmp_path: Path) -> None: + # Arrange + payload = b"data" + target = tmp_path / "missing_sidecar.bin" + Sha256Sidecar.write_atomic_and_sidecar(target, payload) + _sidecar(target).unlink() + + # Act + Assert + with pytest.raises(Sha256SidecarError) as excinfo: + Sha256Sidecar.verify(target) + assert "sidecar" in str(excinfo.value).lower() + + +def test_ac5_malformed_sidecar_rejected(tmp_path: Path) -> None: + # Arrange + payload = b"data" + target = tmp_path / "bad_sidecar.bin" + Sha256Sidecar.write_atomic_and_sidecar(target, payload) + _sidecar(target).write_text("not a hex digest at all") + + # Act + Assert + with pytest.raises(Sha256SidecarError): + Sha256Sidecar.verify(target) + + +def test_ac6_aggregate_hash_order_deterministic(tmp_path: Path) -> None: + # Arrange + paths = [] + for i in range(3): + p = tmp_path / f"file_{i}.bin" + Sha256Sidecar.write_atomic(p, f"payload-{i}".encode()) + paths.append(p) + + # Act + a = Sha256Sidecar.aggregate_hash(paths) + b = Sha256Sidecar.aggregate_hash(list(reversed(paths))) + c = Sha256Sidecar.aggregate_hash([paths[1], paths[0], paths[2]]) + + # Assert — same set, three permutations -> identical aggregate + assert a == b == c + + +def test_ac7_aggregate_hash_missing_path_raises(tmp_path: Path) -> None: + # Arrange + real = tmp_path / "real.bin" + Sha256Sidecar.write_atomic(real, b"data") + ghost = tmp_path / "ghost.bin" # never created + + # Act + Assert + with pytest.raises(Sha256SidecarError) as excinfo: + Sha256Sidecar.aggregate_hash([real, ghost]) + assert "ghost.bin" in str(excinfo.value) + + +def test_ac8_sidecar_format_strictness(tmp_path: Path) -> None: + # Arrange + payload = b"hello world" + target = tmp_path / "strict.bin" + Sha256Sidecar.write_atomic_and_sidecar(target, payload) + + # Act + raw_bytes = _sidecar(target).read_bytes() + text = _sidecar(target).read_text() + + # Assert — exactly 64 hex chars, no newline, no JSON wrapper, no whitespace. + assert len(raw_bytes) == 64, f"sidecar must be 64 bytes; got {len(raw_bytes)}" + assert not raw_bytes.endswith(b"\n"), "sidecar must NOT have a trailing newline" + assert text == text.strip(), "sidecar must contain no whitespace" + int(text, 16) # raises if not pure hex + assert text == hashlib.sha256(payload).hexdigest() + + +def test_ac9_no_upward_imports_to_components() -> None: + # Arrange + module_path = ( + Path(__file__).resolve().parents[2] + / "src" + / "gps_denied_onboard" + / "helpers" + / "sha256_sidecar.py" + ) + tree = ast.parse(module_path.read_text()) + bad_imports: list[str] = [] + + # Act + for node in ast.walk(tree): + if isinstance(node, ast.ImportFrom): + module = node.module or "" + if module.startswith("gps_denied_onboard.components"): + bad_imports.append(module) + elif isinstance(node, ast.Import): + for alias in node.names: + if alias.name.startswith("gps_denied_onboard.components"): + bad_imports.append(alias.name) + + # Assert + assert not bad_imports, ( + f"helpers.sha256_sidecar must not import from components.*; found {bad_imports}" + ) + + +def test_verify_on_missing_path_returns_false(tmp_path: Path) -> None: + """Contract: `verify` distinguishes 'missing path' (False) from 'missing sidecar' (raise).""" + # Act + Assert + assert Sha256Sidecar.verify(tmp_path / "never_created.bin") is False + + +def test_aggregate_includes_all_file_digests(tmp_path: Path) -> None: + """The aggregate must be sensitive to file content, not only path names.""" + # Arrange + a = tmp_path / "a.bin" + b = tmp_path / "b.bin" + Sha256Sidecar.write_atomic(a, b"alpha") + Sha256Sidecar.write_atomic(b, b"beta") + digest_initial = Sha256Sidecar.aggregate_hash([a, b]) + + # Act — mutate b's content and recompute + Sha256Sidecar.write_atomic(b, b"gamma") + digest_after = Sha256Sidecar.aggregate_hash([a, b]) + + # Assert + assert digest_initial != digest_after diff --git a/tests/unit/test_logging_smoke.py b/tests/unit/test_logging_smoke.py index 97caa80..a45bc95 100644 --- a/tests/unit/test_logging_smoke.py +++ b/tests/unit/test_logging_smoke.py @@ -1,9 +1,11 @@ -"""Structured logging smoke — AZ-263 AC-7.""" +"""Structured logging smoke — AZ-263 AC-7 (schema-compliant per AZ-266).""" +import io import json import logging from gps_denied_onboard.logging import get_logger +from gps_denied_onboard.logging.structured import JsonFormatter def test_get_logger_returns_logger_instance() -> None: @@ -16,25 +18,24 @@ def test_get_logger_returns_logger_instance() -> None: def test_log_lines_are_single_json_objects() -> None: # Arrange - import io - - from gps_denied_onboard.logging.structured import _JsonFormatter - stream = io.StringIO() handler = logging.StreamHandler(stream) - handler.setFormatter(_JsonFormatter()) + handler.setFormatter(JsonFormatter()) logger = logging.getLogger("test.json.unit") logger.handlers = [handler] logger.setLevel(logging.DEBUG) logger.propagate = False # Act - logger.error("hello world", extra={"event": "smoke", "value": 42}) + logger.error( + "hello world", + extra={"kind": "test.smoke", "kv": {"event": "smoke", "value": 42}}, + ) # Assert line = stream.getvalue().strip().splitlines()[-1] payload = json.loads(line) assert payload["level"] == "ERROR" assert payload["msg"] == "hello world" - assert payload["event"] == "smoke" - assert payload["value"] == 42 + assert payload["kind"] == "test.smoke" + assert payload["kv"] == {"event": "smoke", "value": 42}