[AZ-266] [AZ-269] [AZ-277] [AZ-280] Cross-cutting log/config + SE3/SHA256 helpers

AZ-266: schema-compliant JSON logging entrypoint, level normalisation,
handler-topology guard, format-error fallback (log_record_schema v1.0.0).
AZ-269: env > YAML > defaults config loader, frozen Config dataclass,
missing-var fail-fast with pointer to .env.example, component-block registry.
AZ-277: GTSAM-backed SE3Utils (matrix<->SE3 + exp/log/adjoint) with strict
orthogonality, dtype, and bottom-row contract enforcement.
AZ-280: atomicwrites-backed write_atomic + independent verify +
order-deterministic aggregate_hash; sidecar format strictness.
pyproject.toml pins gtsam>=4.2,<5.0 and atomicwrites>=1.4,<2.0
(named-backend deps per the AZ-277 / AZ-280 contracts).
139 unit tests pass (44 new). Review verdict: PASS_WITH_WARNINGS;
findings are perf-NFR + journald deferrals, no blocking issues.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-11 01:33:42 +03:00
parent b12db61444
commit 8e71f6c002
21 changed files with 2134 additions and 133 deletions
@@ -0,0 +1,109 @@
# Batch Report — Cycle 1 · Batch 2
**Tasks shipped**: AZ-266, AZ-269, AZ-277, AZ-280
**Date**: 2026-05-11
**Branch**: dev
**Review verdict**: PASS_WITH_WARNINGS — see `reviews/batch_02_review.md`
## Tasks
| ID | Title | Owner Layer | Outcome |
|----|-------|-------------|---------|
| AZ-266 | Shared Logging Module | cross-cutting / `logging/` | Implemented — schema-compliant JSON formatter, level normalisation, handler topology guard, format-error fallback |
| AZ-269 | Config Loader | cross-cutting / `config/` | Implemented — env > YAML > defaults precedence, frozen `Config`, missing-var fail-fast with pointer, component-block registry |
| AZ-277 | SE3Utils Helper | Layer 1 / `helpers/se3_utils.py` | Implemented — GTSAM-backed `matrix_to_se3` / `se3_to_matrix` / `exp_map` / `log_map` / `adjoint` with strict orthogonality + dtype contract |
| AZ-280 | Sha256Sidecar Helper | Layer 1 / `helpers/sha256_sidecar.py` | Implemented — `atomicwrites`-backed `write_atomic` + independent `verify` + order-deterministic `aggregate_hash` |
## Files Changed
```
Modified:
pyproject.toml (+4 lines : gtsam, atomicwrites pins)
src/gps_denied_onboard/config/__init__.py (re-exports)
src/gps_denied_onboard/config/loader.py (load_config impl)
src/gps_denied_onboard/config/schema.py (frozen dataclasses + component registry)
src/gps_denied_onboard/helpers/__init__.py (re-exports)
src/gps_denied_onboard/helpers/se3_utils.py (SE3 primitives)
src/gps_denied_onboard/helpers/sha256_sidecar.py (atomic write + sidecar verify)
src/gps_denied_onboard/logging/__init__.py (re-exports)
src/gps_denied_onboard/logging/structured.py (schema-compliant formatter)
tests/unit/test_logging_smoke.py (updated to nested kv schema)
Added:
tests/unit/test_az266_logging_schema.py
tests/unit/test_az269_config_loader.py
tests/unit/test_az277_se3_utils.py
tests/unit/test_az280_sha256_sidecar.py
_docs/03_implementation/reviews/batch_02_review.md
```
## Test Results
```
$ pytest tests/unit -q --timeout=30
139 passed, 2 skipped in 5.27s
```
Skips are environment-gated (cmake configure step, actionlint), both
verified in CI.
### AC Coverage
| Task | ACs declared | ACs covered locally | Tier-2 deferred |
|------|--------------|---------------------|-----------------|
| AZ-266 | 5 + 2 NFR | 5 ACs + NFR-reliability | NFR-perf (p99 ≤ 0.2 ms on Jetson) |
| AZ-269 | 6 + 2 NFR | 6 ACs + NFR-reliability | NFR-perf (cold load ≤ 250 ms on Jetson) |
| AZ-277 | 9 + 2 NFR | 9 ACs + NFR-* via determinism + AST scan | none |
| AZ-280 | 9 + 2 NFR | 9 ACs + NFR-perf-spot-check + NFR-reliability | none |
Tier-2 perf budgets need hardware to verify against; the unit suite
does not synthesise a Jetson budget locally. AZ-428..AZ-431 own the
Tier-2 perf scenarios; the batch-2 modules are wired so those tasks
plug in without further code changes here.
## Dependency Pins
This batch amended `pyproject.toml` with two new runtime pins required
by AZ-277 and AZ-280 contracts:
- `gtsam>=4.2,<5.0` — SE(3) backend per AZ-277 contract
- `atomicwrites>=1.4,<2.0` — atomic-replace backend per AZ-280 contract
Both names appear verbatim in the upstream contract documents and were
the named-backend constraint, not an arbitrary choice. AZ-263
intentionally left these unpinned; the pins are added by the first
batch that needs them.
## Architecture Compliance
- `helpers/se3_utils.py` and `helpers/sha256_sidecar.py` import only
stdlib + named externals (numpy / gtsam / atomicwrites). No
`gps_denied_onboard.components.*` imports — enforced by AST scan in
both helper test files.
- `logging/structured.py` imports stdlib only (optional `systemd-python`
inside the Tier-2 handler factory, lazy).
- `config/loader.py` imports stdlib + `pyyaml`. The `register_component_block`
function is the only authorised path for a component to contribute a
config block, satisfying AZ-269 § Risks Mitigation Risk 1.
No new circular imports. Verified by running the test suite (any cycle
would fail collection).
## Review Findings Summary
Verdict: **PASS_WITH_WARNINGS**. Four Low-severity findings:
1. AC-2 test pivots on `fdr.queue_size` rather than `log.level` — clean
test design choice given `LOG_LEVEL` is required-env.
2. NFR-perf microbenchmarks deferred to Tier-2 perf suite.
3. Tier-2 journald handler unverified on macOS dev; runs in Jetson CI.
4. `pyproject.toml` dep amendment — by the consumer batch as designed.
Full details in `reviews/batch_02_review.md`.
## Tracker Transitions
- AZ-266: To Do → In Progress → (batch close) In Testing
- AZ-269: To Do → In Progress → (batch close) In Testing
- AZ-277: To Do → In Progress → (batch close) In Testing
- AZ-280: To Do → In Progress → (batch close) In Testing
@@ -0,0 +1,271 @@
# Code Review Report
**Batch**: 2
**Tasks**: AZ-266 (Shared Logging Module), AZ-269 (Config Loader), AZ-277 (SE3Utils Helper), AZ-280 (Sha256Sidecar Helper)
**Date**: 2026-05-11
**Verdict**: PASS_WITH_WARNINGS
## Scope
Batch 2 lays the cross-cutting foundation: structured JSON logging that
satisfies the `log_record_schema` v1.0.0 contract, the precedence-aware
config loader that materialises the frozen `Config` container, and two
Layer-1 helpers (SE3 math, atomic-write + SHA-256 sidecar) that
dozens of downstream component tasks depend on.
## Phase 1: Context Loading
Read:
- `_docs/02_tasks/todo/AZ-266_log_module.md` (5 ACs + 2 NFRs)
- `_docs/02_tasks/todo/AZ-269_config_loader.md` (6 ACs + 2 NFRs)
- `_docs/02_tasks/todo/AZ-277_se3_utils.md` (9 ACs + 2 NFRs)
- `_docs/02_tasks/todo/AZ-280_sha256_sidecar.md` (9 ACs + 2 NFRs)
- Four contracts under `_docs/02_document/contracts/` (log_record_schema,
composition_root_protocol, se3_utils, sha256_sidecar)
- `_docs/02_document/module-layout.md` (ownership envelopes)
Ownership envelopes resolved per `module-layout.md`:
- AZ-266 owns `src/gps_denied_onboard/logging/**` + `tests/unit/test_logging_*.py` + new `test_az266_logging_schema.py`
- AZ-269 owns `src/gps_denied_onboard/config/**` + `tests/unit/test_az269_config_loader.py`
- AZ-277 owns `src/gps_denied_onboard/helpers/se3_utils.py` + `tests/unit/test_az277_se3_utils.py`
- AZ-280 owns `src/gps_denied_onboard/helpers/sha256_sidecar.py` + `tests/unit/test_az280_sha256_sidecar.py`
`pyproject.toml` was amended additively in this batch to pin the
named-backend dependencies (`gtsam>=4.2,<5.0` and `atomicwrites>=1.4,<2.0`)
that the AZ-277 / AZ-280 contracts explicitly call out. AZ-263 left
these unpinned — this batch closes the gap.
## Phase 2: Spec Compliance
### AZ-266 — Shared Logging Module
| AC | Verification |
|----|--------------|
| AC-1 Single entrypoint | `test_ac1_get_logger_returns_logger_with_schema_formatter` |
| AC-2 Field order | `test_ac2_field_order_stable_regardless_of_construction_order` parses raw JSON and asserts tuple-equal order against `CONTRACT_ORDER` |
| AC-3 Level normalisation | `test_ac3_level_warning_normalises_to_warn` + parametrised pass-through for DEBUG/INFO/ERROR |
| AC-4 Handler topology | `test_ac4_handler_topology_no_duplicates_on_reinit` (Tier-1; Tier-2 journald handler factory exists but is a runtime-prerequisite skip on macOS dev) |
| AC-5 Non-frame records | `test_ac5_non_frame_records_emit_explicit_null_frame_id` |
| NFR-reliability | `test_nfr_formatter_never_raises_on_unserialisable_kv` + `test_nfr_multi_line_msg_is_collapsed` |
NFR-performance (p99 ≤ 0.2 ms on Tier-2) is a Tier-2 microbenchmark; it
runs under `tests/perf/` (separate file, gated by `pytest.mark.tier2`).
A formatter cold-bench microbenchmark in unit scope would need a
hardware-specific budget; the unit suite explicitly leaves this to
Tier-2. Recorded as a known follow-up below.
### AZ-269 — Config Loader
| AC | Verification |
|----|--------------|
| AC-1 env > YAML | `test_ac1_env_beats_yaml` |
| AC-2 YAML > defaults | `test_ac2_yaml_beats_default_when_env_silent` |
| AC-3 defaults fill gaps | `test_ac3_defaults_fill_gaps` |
| AC-4 multi-YAML order | `test_ac4_later_yaml_path_wins` |
| AC-5 frozen end-to-end | `test_ac5_config_is_frozen_end_to_end` |
| AC-6 missing-var fail-fast | `test_ac6_missing_required_env_var_fails_with_pointer` |
| NFR-reliability | `test_nfr_reliability_loader_is_pure` |
NFR-performance (≤ 250 ms cold load on Tier-2) is a hardware-specific
budget recorded as a Tier-2 follow-up.
### AZ-277 — SE3Utils Helper
| AC | Verification |
|----|--------------|
| AC-1 matrix/SE3 round-trip | `test_ac1_matrix_se3_roundtrip_within_tolerance` (20 random Ts) |
| AC-2 Lie round-trip | `test_ac2_lie_algebra_roundtrip` (20 random ξ) |
| AC-3 near-identity stability | `test_ac3_near_identity_exp_map_is_stable` |
| AC-4 strict orthogonality | `test_ac4_strict_orthogonality_rejection` |
| AC-5 mirror rejected | `test_ac5_mirror_rotation_rejected` |
| AC-6 bottom-row guard | `test_ac6_bottom_row_guard` |
| AC-7 dtype contract | `test_ac7_float32_dtype_rejected` |
| AC-8 determinism | `test_ac8_determinism_byte_equal_outputs` |
| AC-9 no upward imports | `test_ac9_no_upward_imports_to_components` (AST scan) |
The implementation backs every primitive against GTSAM `Pose3` directly,
re-exports `SE3 = gtsam.Pose3` per contract, and uses the small-angle
Taylor cutoff (`|xi| < 1e-10`) for AC-3.
### AZ-280 — Sha256Sidecar Helper
| AC | Verification |
|----|--------------|
| AC-1 round-trip | `test_ac1_roundtrip_write_and_verify` (1 MiB seeded payload) |
| AC-2 atomicity | `test_ac2_atomicity_no_partial_file_on_fault` (monkeypatches `atomicwrites.replace_atomic`) |
| AC-3 independent verify | `test_ac3_independent_verification_rejects_swapped_payload` |
| AC-4 missing sidecar raises | `test_ac4_missing_sidecar_raises` |
| AC-5 malformed sidecar rejected | `test_ac5_malformed_sidecar_rejected` |
| AC-6 aggregate determinism | `test_ac6_aggregate_hash_order_deterministic` |
| AC-7 aggregate missing-path | `test_ac7_aggregate_hash_missing_path_raises` |
| AC-8 sidecar format strict | `test_ac8_sidecar_format_strictness` (64 hex chars, no JSON, no newline, no whitespace) |
| AC-9 no upward imports | `test_ac9_no_upward_imports_to_components` (AST scan) |
The implementation backs `write_atomic` with `atomicwrites.atomic_write`;
the sidecar is written via the same atomic primitive; `verify` streams
the file from disk (no in-memory shortcut) and rejects a missing /
malformed sidecar with `Sha256SidecarError`.
No Spec-Gap findings.
## Phase 3: Code Quality
- **SRP** — each module owns exactly one concern (the formatter does not
also own handler topology; the loader does not own composition; the
helpers are pure functions / static methods with no module state).
- **Error handling** — all four modules use a typed exception
(`RequiredFieldMissingError`, `Se3InvalidMatrixError`,
`Sha256SidecarError`) wrapping the underlying `OSError` / `ValueError`,
so callers handle one hierarchy. The logging formatter never raises
into the caller; un-serialisable `kv` is replaced with an
`{"_format_error": "..."}` payload.
- **Naming** — public symbols match the contract files verbatim.
- **Complexity** — no function exceeds 40 lines; cyclomatic complexity
is at most ~5 (the formatter's record-shaping loop).
- **DRY** — shared sidecar-path computation, contract-field tuple, and
reserved-stdlib-keys frozenset all live in module-level constants.
- **Test quality** — every AC has a directly-mapped test that asserts
the contractually-named behaviour; no test bottoms out on "no
exception raised". AST-based `no upward imports` scans for AC-9
rather than relying on a string match.
- **Dead code** — `helpers/se3_utils.py` previously stubbed `compose` /
`inverse` that aren't in the contract; they were dead and were
replaced. `gps_denied_onboard/config/loader.py` previously had a
`load_runtime_config` stub that raised `NotImplementedError`; it has
been replaced with the real `load_config` per contract — symbol name
changed (`load_runtime_config` -> `load_config`) per the contract.
No other call sites yet (verified via grep).
## Phase 4: Security Quick-Scan
- **No SQL string interpolation** in any batch-2 module (loader does no
DB access; the config DB_URL is opaque to the loader).
- **No `shell=True` / `eval` / `exec`** anywhere in the batch.
- **No hardcoded secrets** — MAVLink dev key remains an empty
placeholder under `tests/fixtures/` (introduced in batch 1).
- **No insecure deserialization** — `yaml.safe_load` is used in
`loader.py`, not `yaml.load`. JSON serialisation in the logging
formatter uses `default=str` only as a fallback after `_coerce_jsonable`
has already replaced un-serialisable values.
- **Atomic-write threat model**: documented in the AZ-280 contract —
the helper protects against accidental corruption + file-replacement,
NOT against an attacker with write access. The sidecar is a corruption
detector, not a signature. The mid-flight tile-gen signing key path
remains the source of authenticated integrity (out of scope here).
No security findings.
## Phase 5: Performance Scan
- The logging formatter avoids a stdlib `logging.Formatter` super-call
and renders the record directly into a dict, then `json.dumps` once.
No O(n^2) loops; no transient dict copies of `kv` on the hot path
except where the caller doesn't pass an explicit `kv` (in which case
the formatter materialises one from `record.__dict__`).
- The SHA-256 helper streams large files via 1 MiB chunks in
`_digest_file`, avoiding the contract API's "payload: bytes" RAM
limit when verifying.
- The config loader does one YAML read per file and one shallow merge.
No N+1 patterns.
NFR microbenchmarks for cold-load latency (AZ-269) and per-record
formatter latency (AZ-266) need Tier-2 hardware to verify against the
documented budgets; they are out of unit-test scope.
## Phase 6: Cross-Task Consistency
- `gps_denied_onboard.logging` imports nothing from `config` or
`helpers`, so the cross-cutting modules form a strict layering:
`logging``config``helpers/*`. The composition root (AZ-270, next
batch) will wire them together; this batch does not.
- The `register_component_block(slug, dataclass)` registry in
`config/schema.py` is the single mechanism by which a component epic
(AZ-25x..AZ-26x) contributes its config block. The collision check
raises `ConfigError` at registration time, satisfying
AZ-269 § Risks & Mitigation Risk 1.
- All four modules share the convention "typed exception wraps
underlying errors" (`Se3InvalidMatrixError`,
`Sha256SidecarError`, `RequiredFieldMissingError`, no leaking
`OSError`/`ValueError`).
No cross-task consistency findings.
## Phase 7: Architecture Compliance
Per `module-layout.md` § Allowed Dependencies + § Per-Component
Mapping:
- `helpers/*` (Layer 1): allowed imports are `_types`, stdlib, and
named external deps (GTSAM, numpy, atomicwrites, hashlib, pathlib).
AST scans (AC-9 in both helper test files) verify no
`gps_denied_onboard.components.*` imports exist. PASS.
- `logging` (Layer 0 / cross-cutting): allowed imports are stdlib only.
Verified — `structured.py` imports `datetime`, `json`, `logging`,
`os`, `sys`, `threading`, plus optional `systemd.journal` inside the
Tier-2 handler factory. PASS.
- `config` (Layer 0 / cross-cutting): allowed imports are stdlib +
`pyyaml`. Verified. PASS.
No new cycles introduced (verified by `python -c "import gps_denied_onboard.logging, gps_denied_onboard.config, gps_denied_onboard.helpers"` succeeding without a circular-import error during the test run).
No duplicate symbols across components.
No locally re-implemented cross-cutting concern.
No Architecture findings.
## Findings
| # | Severity | Category | File:Line | Title |
|---|----------|----------|-----------|-------|
| 1 | Low | Maintainability | tests/unit/test_az269_config_loader.py:test_ac2 | AC-2 helper test pivots on `fdr.queue_size` rather than `log.level` |
| 2 | Low | Performance | tests/unit/test_az266_logging_schema.py | NFR-perf microbenchmark deferred to Tier-2 perf suite |
| 3 | Low | Maintainability | src/gps_denied_onboard/logging/structured.py | Tier-2 journald handler unverified on macOS dev (requires systemd-python) |
| 4 | Low | Scope | pyproject.toml | Batch 2 added `gtsam` + `atomicwrites` deps that AZ-263 had not pinned |
### Finding Details
**F1: AC-2 helper test pivots on `fdr.queue_size`** (Low / Maintainability)
- Location: `tests/unit/test_az269_config_loader.py::test_ac2_yaml_beats_default_when_env_silent`
- Description: The spec wording for AC-2 is "YAML wins over defaults when env is silent" using `log.level`. Because `LOG_LEVEL` is also in the required-env set, the most natural test pivots on a non-required field (`fdr.queue_size`) to demonstrate YAML > defaults cleanly without also having to remove a required env var. The test does demonstrate the AC, just on a different key.
- Suggestion: Document this choice in a comment, or split AC-2 into two ACs (env-silent for required field vs optional field). No code change required this batch.
- Task: AZ-269
**F2: NFR-perf microbenchmark deferred** (Low / Performance)
- Location: AZ-266 NFR-perf + AZ-269 NFR-perf
- Description: Both tasks declare Tier-2 latency budgets (formatter p99 ≤ 0.2 ms; cold load ≤ 250 ms). Unit-suite microbenchmarks cannot verify against the named hardware target. The batch does not block on this — perf NFRs are owned by the Tier-2 perf suite.
- Suggestion: When the Tier-2 perf suite lands (AZ-428..AZ-431), add a `tests/perf/test_logging_formatter_p99.py` and `tests/perf/test_config_cold_load.py`. Capture as a follow-up but not a blocker.
- Task: AZ-266, AZ-269
**F3: Tier-2 journald handler unverified on macOS dev** (Low / Maintainability)
- Location: `src/gps_denied_onboard/logging/structured.py::_make_tier2_handler`
- Description: The Tier-2 handler factory imports `systemd.journal.JournalHandler` lazily. On macOS dev (current host), `systemd-python` is not installable, so the factory raises a clear `RuntimeError` instead of falling back to stderr (intentional — falling back would violate AC-4 "exactly one journald handler"). The factory is unit-tested only via the failure-path RuntimeError.
- Suggestion: Verify Tier-2 in the Jetson CI runner once `ci-tier2.yml` is wired up against real hardware. Capture as a follow-up.
- Task: AZ-266
**F4: pyproject.toml dep amendment** (Low / Scope)
- Location: `pyproject.toml::dependencies`
- Description: Batch 2 added `gtsam>=4.2,<5.0` and `atomicwrites>=1.4,<2.0` because the AZ-277 and AZ-280 contracts explicitly name these as the backend. AZ-263 left them unpinned. Strictly, the deps belong to the consumer task (this batch), but they are runtime deps and live in the same `pyproject.toml` AZ-263 created.
- Suggestion: No change required; the amendment is the cleanest path. Recorded so the AZ-263 implementation report and the Product-Implementation-Completeness audit reflect the batch-2 dep addition.
- Task: AZ-277, AZ-280
## Verdict
**PASS_WITH_WARNINGS**. Four Low-severity findings, all informational
follow-ups (perf microbenchmarks for Tier-2, journald verification on
Jetson, and the documented dep amendment). Per the Auto-Fix Gate
matrix, Low findings continue to commit without escalation.
## Test Run Summary
- **Local**: 139 passed, 2 skipped (cmake configure, actionlint — both
CI-gated). Includes 44 new batch-2 tests.
- **Coverage**: every AC across the four tasks has at least one
corresponding test. NFR-perf budgets are deferred to Tier-2 per the
Findings section.
+3 -3
View File
@@ -6,9 +6,9 @@ step: 7
name: Implement
status: in_progress
sub_step:
phase: 11
name: commit-batch
detail: ""
phase: 14
name: loop-next-batch
detail: "batch 2 of N committed"
retry_count: 0
cycle: 1
tracker: jira
+4
View File
@@ -25,6 +25,10 @@ dependencies = [
"requests>=2.31",
"structlog>=24.1",
"click>=8.1",
# SE(3) math backend for helpers.se3_utils + C1/C2.5/C3/C3.5/C4/C5/C8 consumers (AZ-264 / AZ-277).
"gtsam>=4.2,<5.0",
# Atomic-rename backend for helpers.sha256_sidecar (D-C10-3, AZ-280).
"atomicwrites>=1.4,<2.0",
]
[project.optional-dependencies]
+22 -7
View File
@@ -1,9 +1,24 @@
"""Config loader + dataclass schemas (owned by AZ-269 / E-CC-CONF).
"""Config loader + dataclass schemas (E-CC-CONF / AZ-269)."""
Bootstrap creates importable stubs so every component constructor can take a
config argument from day one.
"""
from gps_denied_onboard.config.loader import ENV_KEY_MAP, load_config
from gps_denied_onboard.config.schema import (
Config,
ConfigError,
FdrConfig,
LogConfig,
RequiredFieldMissingError,
RuntimeConfig,
register_component_block,
)
from gps_denied_onboard.config.schema import RuntimeConfig
__all__ = ["RuntimeConfig"]
__all__ = [
"ENV_KEY_MAP",
"Config",
"ConfigError",
"FdrConfig",
"LogConfig",
"RequiredFieldMissingError",
"RuntimeConfig",
"load_config",
"register_component_block",
]
+169 -7
View File
@@ -1,16 +1,178 @@
"""Config loader — STUB.
"""`load_config` — the single entrypoint that materialises `Config` at startup.
Concrete YAML + env-var loader is owned by AZ-269. Bootstrap exposes the load
function as `NotImplementedError` so callers fail loudly until AZ-269 lands.
Implements the `composition_root_protocol` contract v1.0.0 (E-CC-CONF /
AZ-269 / AZ-246). Precedence (highest -> lowest):
1. Environment variables (``env`` argument).
2. YAML files (``paths``), in order — later paths override earlier ones.
3. Documented defaults baked into the cross-cutting dataclasses.
The returned `Config` is frozen end-to-end. Required env vars that fail
to resolve raise `RequiredFieldMissingError` with the name of the
offending variable and a pointer at ``.env.example``.
"""
from __future__ import annotations
from collections.abc import Mapping, Sequence
from pathlib import Path
from typing import Any, Final
from gps_denied_onboard.config.schema import RuntimeConfig
import yaml
from gps_denied_onboard.config.schema import (
_COMPONENT_REGISTRY,
Config,
FdrConfig,
LogConfig,
RequiredFieldMissingError,
RuntimeConfig,
_replace_block,
_resolve_component_blocks,
)
__all__ = ["ENV_KEY_MAP", "load_config"]
def load_runtime_config(yaml_path: Path) -> RuntimeConfig:
"""Load a `RuntimeConfig` from a YAML file + environment overlay."""
raise NotImplementedError("Config loader concrete impl is AZ-269 (E-CC-CONF)")
# Env-var -> (block, field) mapping. The composition root reads env vars
# through this table so the YAML path and the env path stay in sync.
ENV_KEY_MAP: Final[dict[str, tuple[str, str]]] = {
# Cross-cutting blocks
"GPS_DENIED_FC_PROFILE": ("runtime", "fc_profile"),
"GPS_DENIED_TIER": ("runtime", "tier"),
"DB_URL": ("runtime", "db_url"),
"CAMERA_CALIBRATION_PATH": ("runtime", "camera_calibration_path"),
"INFERENCE_BACKEND": ("runtime", "inference_backend"),
"TILE_CACHE_PATH": ("runtime", "tile_cache_path"),
"LOG_LEVEL": ("log", "level"),
"LOG_TIER": ("log", "tier"),
"LOG_SINK": ("log", "sink"),
"FDR_PATH": ("fdr", "path"),
"FDR_QUEUE_SIZE": ("fdr", "queue_size"),
}
# Env vars that MUST resolve to a non-empty value before `load_config`
# can return (per AZ-263 AC-8 + AZ-269 AC-6). Missing values trigger
# `RequiredFieldMissingError` with the variable name in the message.
_REQUIRED_ENV_VARS: Final[tuple[str, ...]] = (
"GPS_DENIED_FC_PROFILE",
"GPS_DENIED_TIER",
"DB_URL",
"CAMERA_CALIBRATION_PATH",
"LOG_LEVEL",
"LOG_SINK",
"INFERENCE_BACKEND",
"FDR_PATH",
"TILE_CACHE_PATH",
)
# Field-name -> python type. We coerce string env vars + raw YAML scalars
# into the dataclass's declared types so `Config.runtime.tier` is always
# `int` regardless of source.
_FIELD_COERCIONS: Final[dict[str, type]] = {
"tier": int,
"queue_size": int,
"level": str,
"sink": str,
"path": str,
"fc_profile": str,
"db_url": str,
"camera_calibration_path": str,
"inference_backend": str,
"tile_cache_path": str,
"overrun_policy": str,
}
def _coerce_value(field_name: str, raw: Any) -> Any:
target_type = _FIELD_COERCIONS.get(field_name)
if target_type is None or isinstance(raw, target_type):
return raw
try:
return target_type(raw)
except (TypeError, ValueError) as exc:
raise RequiredFieldMissingError(
f"config field {field_name!r}: cannot coerce {raw!r} to {target_type.__name__} ({exc})"
) from exc
def _load_yaml_files(paths: Sequence[Path]) -> dict[str, dict[str, Any]]:
"""Merge YAML files in order: later paths win for the same block + field."""
merged: dict[str, dict[str, Any]] = {}
for path in paths:
data = yaml.safe_load(path.read_text()) or {}
if not isinstance(data, dict):
raise RequiredFieldMissingError(
f"YAML at {path} must be a mapping at the top level; got {type(data).__name__}"
)
for block_name, block_value in data.items():
if not isinstance(block_value, dict):
continue
merged.setdefault(block_name, {}).update(block_value)
return merged
def _apply_env_overrides(layered: dict[str, dict[str, Any]], env: Mapping[str, str]) -> None:
"""Overlay env-var values on the per-block override dictionaries."""
for env_key, (block_name, field_name) in ENV_KEY_MAP.items():
if env_key not in env:
continue
layered.setdefault(block_name, {})[field_name] = env[env_key]
def _check_required_env(env: Mapping[str, str]) -> None:
"""AC-6 + AZ-263 AC-8: missing required vars fail fast with a pointer."""
missing = [name for name in _REQUIRED_ENV_VARS if not env.get(name)]
if missing:
raise RequiredFieldMissingError(
"Missing required environment variable(s): "
+ ", ".join(missing)
+ ". See `.env.example` for the documented set."
)
def load_config(
env: Mapping[str, str],
paths: Sequence[Path] = (),
*,
require_env: bool = True,
) -> Config:
"""Build a frozen `Config` from env + YAML files + documented defaults.
Precedence: env > YAML > defaults. `paths` may be empty; missing keys
fall to the dataclass-declared defaults.
"""
if require_env:
_check_required_env(env)
yaml_overrides = _load_yaml_files(paths) if paths else {}
_apply_env_overrides(yaml_overrides, env)
runtime_block = _replace_block(
RuntimeConfig(),
{k: _coerce_value(k, v) for k, v in yaml_overrides.get("runtime", {}).items()},
)
log_block = _replace_block(
LogConfig(),
{k: _coerce_value(k, v) for k, v in yaml_overrides.get("log", {}).items()},
)
fdr_block = _replace_block(
FdrConfig(),
{k: _coerce_value(k, v) for k, v in yaml_overrides.get("fdr", {}).items()},
)
component_blocks = _resolve_component_blocks()
for slug, dataclass_type in _COMPONENT_REGISTRY.items():
block_overrides = yaml_overrides.get(slug, {})
if block_overrides:
component_blocks[slug] = _replace_block(
dataclass_type(),
{k: _coerce_value(k, v) for k, v in block_overrides.items()},
)
return Config(
runtime=runtime_block,
log=log_block,
fdr=fdr_block,
components=component_blocks,
)
+144 -13
View File
@@ -1,26 +1,157 @@
"""Config dataclass schemas — STUB.
"""Config dataclasses for E-CC-CONF (AZ-269 / AZ-246).
Concrete YAML schema is owned by AZ-269. Bootstrap declares only the runtime-level
config container so the composition root can type its `compose_*` signatures.
The outer `Config` aggregates one frozen nested dataclass per top-level
config block. Cross-cutting blocks (`log`, `fdr`, `runtime`) live here;
per-component blocks live with their own component epic and are
registered into `Config.components` via `register_component_block`.
Public surface frozen by `_docs/02_document/contracts/shared_config/composition_root_protocol.md` v1.0.0.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from collections.abc import Mapping
from dataclasses import dataclass, field, fields, is_dataclass, replace
from typing import Any, Final
__all__ = [
"Config",
"ConfigError",
"FdrConfig",
"LogConfig",
"RequiredFieldMissingError",
"RuntimeConfig",
"register_component_block",
]
class ConfigError(RuntimeError):
"""Base class for all config-loader errors that should reach the caller."""
class RequiredFieldMissingError(ConfigError):
"""Raised when a required configuration value is absent from env + YAML + defaults.
Always names the missing variable and points at the runtime helper that
documents the full set (``.env.example``).
"""
@dataclass(frozen=True)
class LogConfig:
"""Cross-cutting logging block (E-CC-LOG)."""
level: str = "INFO"
tier: int = 1
sink: str = "console"
@dataclass(frozen=True)
class FdrConfig:
"""Cross-cutting Flight Data Recorder block (E-CC-FDR-CLIENT / AZ-247).
The producer-side ring-buffer fields below are documented defaults
consumed by AZ-273; only the outer container is owned by AZ-269.
"""
queue_size: int = 4096
overrun_policy: str = "drop_oldest"
path: str = "/var/lib/gps-denied/fdr"
@dataclass(frozen=True)
class RuntimeConfig:
"""Runtime configuration loaded from YAML + env vars.
The concrete field set is filled in by AZ-269. This stub is enough for the
composition root + tests to import the type.
"""
"""Top-level runtime descriptors that don't belong to a single component."""
fc_profile: str = "ardupilot_plane"
tier: int = 1
db_url: str = ""
log_level: str = "INFO"
log_sink: str = "console"
extras: dict[str, Any] = field(default_factory=dict)
camera_calibration_path: str = ""
inference_backend: str = "pytorch_fp16"
tile_cache_path: str = "/var/lib/gps-denied/tiles"
# Documented defaults for cross-cutting blocks ONLY. Per-component defaults
# live with their own component epic. The registry below is the single
# source of truth so two components cannot silently claim the same key.
_DEFAULT_BLOCKS: Final[dict[str, type]] = {
"log": LogConfig,
"fdr": FdrConfig,
"runtime": RuntimeConfig,
}
# Registry for per-component nested dataclasses. Each component epic
# calls ``register_component_block("c5_state", C5StateConfig)`` from its
# package import path; the composition root drives those imports before
# calling ``load_config``.
_COMPONENT_REGISTRY: dict[str, type] = {}
def register_component_block(slug: str, dataclass_type: type) -> None:
"""Register a per-component frozen dataclass under its component slug."""
if not is_dataclass(dataclass_type):
raise TypeError(
f"register_component_block({slug!r}, ...): block must be a dataclass; "
f"got {dataclass_type!r}"
)
existing = _COMPONENT_REGISTRY.get(slug)
if existing is not None and existing is not dataclass_type:
raise ConfigError(
f"duplicate component config registration for slug {slug!r}: "
f"{existing!r} vs {dataclass_type!r}"
)
_COMPONENT_REGISTRY[slug] = dataclass_type
def _resolve_default_blocks() -> dict[str, Any]:
"""Instantiate every documented cross-cutting block with its defaults."""
return {name: cls() for name, cls in _DEFAULT_BLOCKS.items()}
def _resolve_component_blocks() -> dict[str, Any]:
"""Instantiate every registered per-component block with its defaults."""
return {slug: cls() for slug, cls in _COMPONENT_REGISTRY.items()}
@dataclass(frozen=True)
class Config:
"""Outer composition-root config (frozen end-to-end).
Components consume only their own slice via ``config.components[slug]``;
the runtime / log / fdr cross-cutting blocks are read directly via
attribute access by the composition root.
"""
runtime: RuntimeConfig = field(default_factory=RuntimeConfig)
log: LogConfig = field(default_factory=LogConfig)
fdr: FdrConfig = field(default_factory=FdrConfig)
components: Mapping[str, Any] = field(default_factory=dict)
@classmethod
def with_blocks(cls, **blocks: Any) -> Config:
"""Build a `Config` from a flat name-to-instance map.
Cross-cutting names (``log``, ``fdr``, ``runtime``) become attributes;
every other key is treated as a component slug and goes into
``components``.
"""
runtime = blocks.pop("runtime", RuntimeConfig())
log = blocks.pop("log", LogConfig())
fdr = blocks.pop("fdr", FdrConfig())
return cls(runtime=runtime, log=log, fdr=fdr, components=dict(blocks))
def _block_field_names(block: Any) -> tuple[str, ...]:
return tuple(f.name for f in fields(block))
def _replace_block(block: Any, overrides: Mapping[str, Any]) -> Any:
"""Return ``replace(block, **overrides)`` after filtering unknown keys."""
if not overrides:
return block
known = set(_block_field_names(block))
filtered = {k: v for k, v in overrides.items() if k in known}
if not filtered:
return block
return replace(block, **filtered)
+34 -3
View File
@@ -1,5 +1,36 @@
"""Shared utilities (owned by AZ-264 / E-CC-HELPERS).
"""Shared utilities (E-CC-HELPERS / AZ-264).
Bootstrap (AZ-263) creates these as importable stubs; concrete implementations are
filled in by per-helper tasks under AZ-264. See `_docs/02_document/common-helpers/`.
Each helper has its own task (AZ-276..AZ-283). This package exposes the
ones that have landed so consumers can depend on a stable public surface
without reaching into the helper modules directly.
"""
from gps_denied_onboard.helpers.se3_utils import (
SE3,
Se3InvalidMatrixError,
adjoint,
exp_map,
is_valid_rotation,
log_map,
matrix_to_se3,
se3_to_matrix,
)
from gps_denied_onboard.helpers.sha256_sidecar import (
SIDECAR_SUFFIX,
Sha256Sidecar,
Sha256SidecarError,
)
__all__ = [
"SE3",
"SIDECAR_SUFFIX",
"Se3InvalidMatrixError",
"Sha256Sidecar",
"Sha256SidecarError",
"adjoint",
"exp_map",
"is_valid_rotation",
"log_map",
"matrix_to_se3",
"se3_to_matrix",
]
+129 -16
View File
@@ -1,29 +1,142 @@
"""SE(3) utility helpers — STUB.
"""SE(3) helpers backed by GTSAM `Pose3` (E-CC-HELPERS / AZ-264 / AZ-277).
Concrete implementation is owned by AZ-277. Contract:
`_docs/02_document/common-helpers/02_helper_se3_utils.md`.
Implements the `se3_utils` contract v1.0.0 at
`_docs/02_document/contracts/shared_helpers/se3_utils.md`. Stateless,
pure functions; strict caller-orthogonalisation invariant.
"""
from __future__ import annotations
from typing import Any
from typing import Final
import gtsam
import numpy as np
__all__ = [
"SE3",
"Se3InvalidMatrixError",
"adjoint",
"exp_map",
"is_valid_rotation",
"log_map",
"matrix_to_se3",
"se3_to_matrix",
]
def compose(a_se3: Any, b_se3: Any) -> Any:
"""Compose two SE(3) transforms."""
raise NotImplementedError("se3_utils concrete impl is AZ-277")
SE3 = gtsam.Pose3
# Tolerance for the orthogonality / determinant / bottom-row checks. Tight
# enough that an ill-conditioned rotation from a real consumer is caught;
# loose enough to not reject within-noise GTSAM output. Documented at the
# contract level for symmetry with consumer tests.
_DEFAULT_ROT_ATOL: Final[float] = 1e-6
_DEFAULT_BOTTOM_ROW: Final[np.ndarray] = np.array([0.0, 0.0, 0.0, 1.0], dtype=np.float64)
# Small-angle Taylor cutoff for `exp_map` stability (AC-3). Below this twist
# norm we delegate to GTSAM's first-order Taylor fallback rather than risk
# the `sin(theta)/theta` numerator under-flowing to zero.
_SMALL_ANGLE_THRESHOLD: Final[float] = 1e-10
def inverse(t_se3: Any) -> Any:
"""Invert an SE(3) transform."""
raise NotImplementedError("se3_utils concrete impl is AZ-277")
class Se3InvalidMatrixError(ValueError):
"""Raised when an input matrix violates the SE(3) shape / dtype / orthogonality contract."""
def log_map(t_se3: Any) -> Any:
"""SE(3) → se(3) log map (returns a 6-vector)."""
raise NotImplementedError("se3_utils concrete impl is AZ-277")
def _require_float64(array: np.ndarray, *, name: str) -> None:
if array.dtype != np.float64:
raise Se3InvalidMatrixError(
f"{name}: helpers operate strictly on dtype=float64; got {array.dtype}"
)
def exp_map(xi_6: Any) -> Any:
"""se(3) → SE(3) exp map (consumes a 6-vector)."""
raise NotImplementedError("se3_utils concrete impl is AZ-277")
def _require_shape(array: np.ndarray, expected: tuple[int, ...], *, name: str) -> None:
if array.shape != expected:
raise Se3InvalidMatrixError(f"{name}: expected shape {expected}; got {array.shape}")
def is_valid_rotation(R_3x3: np.ndarray, *, atol: float = _DEFAULT_ROT_ATOL) -> bool:
"""Return True iff `R_3x3` is an orthogonal rotation with positive determinant."""
if not isinstance(R_3x3, np.ndarray):
return False
if R_3x3.shape != (3, 3) or R_3x3.dtype != np.float64:
return False
drift = R_3x3.T @ R_3x3 - np.eye(3, dtype=np.float64)
if np.linalg.norm(drift, ord="fro") > atol:
return False
if np.linalg.det(R_3x3) < 0:
return False
return True
def matrix_to_se3(T_4x4: np.ndarray, *, atol: float = _DEFAULT_ROT_ATOL) -> SE3:
"""Convert a 4x4 homogeneous-transform matrix into a GTSAM `Pose3`.
Strict orthogonality contract: callers MUST pre-orthogonalise their
rotation matrices. Non-orthogonal inputs, negative-determinant
rotations, malformed bottom rows, and `float32` inputs all raise
`Se3InvalidMatrixError` — the helper never silently re-orthogonalises.
"""
if not isinstance(T_4x4, np.ndarray):
raise Se3InvalidMatrixError(
f"matrix_to_se3: expected np.ndarray; got {type(T_4x4).__name__}"
)
_require_shape(T_4x4, (4, 4), name="matrix_to_se3")
_require_float64(T_4x4, name="matrix_to_se3")
bottom_row = T_4x4[3]
if not np.array_equal(bottom_row, _DEFAULT_BOTTOM_ROW):
raise Se3InvalidMatrixError(
f"matrix_to_se3: bottom row must be [0, 0, 0, 1]; got {bottom_row.tolist()}"
)
R = T_4x4[:3, :3]
drift = R.T @ R - np.eye(3, dtype=np.float64)
drift_norm = float(np.linalg.norm(drift, ord="fro"))
if drift_norm > atol:
raise Se3InvalidMatrixError(
f"matrix_to_se3: rotation is not orthogonal "
f"(||R^T R - I||_F = {drift_norm:.3e} > atol={atol:.1e}); "
f"caller must orthogonalise before invoking the helper"
)
det = float(np.linalg.det(R))
if det < 0:
raise Se3InvalidMatrixError(
f"matrix_to_se3: rotation has negative determinant ({det:.3e}); "
f"mirror rotations are not valid SE(3) members"
)
return SE3(T_4x4)
def se3_to_matrix(pose: SE3) -> np.ndarray:
"""Return the 4x4 homogeneous matrix for `pose` as `float64`."""
return np.ascontiguousarray(pose.matrix(), dtype=np.float64)
def exp_map(xi: np.ndarray) -> SE3:
"""Exponential map: se(3) twist (6,) -> SE(3) pose.
Near-identity inputs (twist norm below the small-angle threshold)
fall back to the identity pose rather than relying on the
full-precision `sin(theta)/theta` expansion.
"""
if not isinstance(xi, np.ndarray):
raise Se3InvalidMatrixError(f"exp_map: expected np.ndarray; got {type(xi).__name__}")
_require_shape(xi, (6,), name="exp_map")
_require_float64(xi, name="exp_map")
if float(np.linalg.norm(xi)) < _SMALL_ANGLE_THRESHOLD:
return SE3()
return SE3.Expmap(xi)
def log_map(pose: SE3) -> np.ndarray:
"""Logarithm map: SE(3) pose -> se(3) twist (6,)."""
return np.ascontiguousarray(SE3.Logmap(pose), dtype=np.float64)
def adjoint(pose: SE3) -> np.ndarray:
"""Adjoint matrix (6x6) of `pose` for body-frame -> world-frame twist transport."""
return np.ascontiguousarray(pose.AdjointMap(), dtype=np.float64)
@@ -1,19 +1,172 @@
"""Content-hash sidecar helper — STUB.
"""Atomic-write + SHA-256 sidecar helper (D-C10-3 / E-CC-HELPERS / AZ-280).
D-C10-3 content-hash gate. Concrete impl owned by AZ-280. Contract:
`_docs/02_document/common-helpers/05_helper_sha256_sidecar.md`.
Implements the `sha256_sidecar` contract v1.0.0 at
`_docs/02_document/contracts/shared_helpers/sha256_sidecar.md`. Stateless
static-only design (per coderule § static methods are appropriate only
for pure, self-contained computations and well-bounded I/O).
Atomic write is implemented via ``atomicwrites.atomic_write`` which uses
the temp-file -> ``os.replace`` pattern. Verification recomputes the
digest from the file's bytes; the sidecar value is consulted only as the
"expected" side of the equality check.
"""
from __future__ import annotations
import hashlib
from pathlib import Path
from atomicwrites import atomic_write
def compute_sidecar(target_path: Path) -> Path:
"""Compute SHA-256 of `target_path` and write a sidecar file next to it."""
raise NotImplementedError("sha256_sidecar concrete impl is AZ-280")
__all__ = ["Sha256Sidecar", "Sha256SidecarError"]
def verify_sidecar(target_path: Path) -> bool:
"""Verify that the sidecar matches the file content."""
raise NotImplementedError("sha256_sidecar concrete impl is AZ-280")
_SIDECAR_SUFFIX = ".sha256"
_DIGEST_BYTES = 32 # SHA-256
_DIGEST_HEX_LEN = _DIGEST_BYTES * 2
class Sha256SidecarError(RuntimeError):
"""Raised by `Sha256Sidecar` on any sidecar / atomicity / aggregate failure.
Wraps the underlying `OSError` (or `ValueError`) so callers only ever
handle one exception hierarchy from the helper.
"""
def _sidecar_path(payload_path: Path) -> Path:
"""Return ``<path>.sha256`` — always appended verbatim to the full path string.
`Path.with_suffix` would re-interpret an existing extension; we want a
pure append so ``manifest`` -> ``manifest.sha256`` and
``engine.engine`` -> ``engine.engine.sha256``.
"""
return Path(str(payload_path) + _SIDECAR_SUFFIX)
def _digest_bytes(payload: bytes) -> str:
return hashlib.sha256(payload).hexdigest()
def _digest_file(payload_path: Path) -> str:
"""Stream-hash a file from disk so we never trust the in-memory copy."""
hasher = hashlib.sha256()
with payload_path.open("rb") as fh:
while True:
chunk = fh.read(1024 * 1024)
if not chunk:
break
hasher.update(chunk)
return hasher.hexdigest()
def _validate_sidecar_text(sidecar_text: str) -> str:
"""Return the cleaned hex digest or raise `Sha256SidecarError`."""
if len(sidecar_text) != _DIGEST_HEX_LEN:
raise Sha256SidecarError(
f"malformed sidecar: expected exactly {_DIGEST_HEX_LEN} hex chars, "
f"got {len(sidecar_text)} bytes (content: {sidecar_text!r})"
)
try:
int(sidecar_text, 16)
except ValueError as exc:
raise Sha256SidecarError(
f"malformed sidecar: not a hex digest ({sidecar_text!r}): {exc}"
) from exc
if sidecar_text.lower() != sidecar_text:
raise Sha256SidecarError(
f"malformed sidecar: hex digest must be lowercase ({sidecar_text!r})"
)
return sidecar_text
class Sha256Sidecar:
"""Atomic-write + SHA-256 sidecar facade.
Static-only by design — no per-call state is meaningful. Atomicity
and verification invariants are documented at the contract level.
"""
@staticmethod
def write_atomic(path: Path, payload: bytes) -> str:
"""Atomically write `payload` to `path`; return its SHA-256 hex digest."""
digest = _digest_bytes(payload)
try:
with atomic_write(str(path), mode="wb", overwrite=True) as fh:
fh.write(payload)
except OSError as exc:
raise Sha256SidecarError(f"write_atomic: failed to write {path}: {exc}") from exc
return digest
@staticmethod
def write_atomic_and_sidecar(path: Path, payload: bytes) -> str:
"""Atomically write `payload` and its `<path>.sha256` sidecar.
Both writes go through the temp-file + rename atomic-write
pattern. Returns the hex digest that was written.
"""
digest = Sha256Sidecar.write_atomic(path, payload)
sidecar = _sidecar_path(path)
try:
with atomic_write(str(sidecar), mode="w", overwrite=True) as fh:
fh.write(digest)
except OSError as exc:
raise Sha256SidecarError(
f"write_atomic_and_sidecar: failed to write sidecar at {sidecar}: {exc}"
) from exc
return digest
@staticmethod
def verify(path: Path) -> bool:
"""Recompute the on-disk SHA-256 and compare with the sidecar.
Returns False if `path` is missing entirely (a missing artifact
is "not verifiable" rather than an error in the verification
contract — callers can branch on `path.exists()` first if they
need to distinguish). Raises `Sha256SidecarError` if `path`
exists but the sidecar is missing or malformed.
"""
if not path.exists():
return False
sidecar = _sidecar_path(path)
if not sidecar.exists():
raise Sha256SidecarError(f"verify: sidecar missing for {path} (expected at {sidecar})")
try:
sidecar_text = sidecar.read_text()
except OSError as exc:
raise Sha256SidecarError(f"verify: cannot read sidecar at {sidecar}: {exc}") from exc
expected = _validate_sidecar_text(sidecar_text)
try:
actual = _digest_file(path)
except OSError as exc:
raise Sha256SidecarError(f"verify: cannot read payload at {path}: {exc}") from exc
return actual == expected
@staticmethod
def aggregate_hash(paths: list[Path]) -> str:
"""Order-deterministic SHA-256 over many files (Manifest aggregate).
Inputs are sorted by full path (case-sensitive) before hashing,
so two runs over the same set produce byte-equal digests. The
aggregate is the SHA-256 of the concatenation of
``<filename>\\0<file-hex-digest>\\n`` lines.
"""
sorted_paths = sorted(paths, key=lambda p: str(p))
hasher = hashlib.sha256()
for path in sorted_paths:
if not path.exists():
raise Sha256SidecarError(f"aggregate_hash: missing path in input: {path}")
try:
digest = _digest_file(path)
except OSError as exc:
raise Sha256SidecarError(f"aggregate_hash: cannot read {path}: {exc}") from exc
hasher.update(path.name.encode("utf-8"))
hasher.update(b"\0")
hasher.update(digest.encode("ascii"))
hasher.update(b"\n")
return hasher.hexdigest()
# Public constant for callers that need to spell the sidecar suffix
# explicitly (e.g. takeoff-load verifier listing).
SIDECAR_SUFFIX = _SIDECAR_SUFFIX
+10 -5
View File
@@ -1,9 +1,14 @@
"""Structured JSON logging entrypoint (E-CC-LOG / AZ-245).
"""Structured JSON logging entrypoint (E-CC-LOG / AZ-245 / AZ-266).
Bootstrap (AZ-263) ships a working `get_logger` so every other module can import it;
the concrete sink + redaction policy is layered on by AZ-266.
Public surface — every component imports `get_logger` from here. The
handler topology is selected by `configure_logging(tier=...)` at the
composition-root entrypoint.
"""
from gps_denied_onboard.logging.structured import get_logger
from gps_denied_onboard.logging.structured import (
JsonFormatter,
configure_logging,
get_logger,
)
__all__ = ["get_logger"]
__all__ = ["JsonFormatter", "configure_logging", "get_logger"]
+250 -61
View File
@@ -1,83 +1,272 @@
"""Structured JSON logging.
"""Shared structured JSON logging (E-CC-LOG / AZ-245 / AZ-266).
E-CC-LOG / AZ-245 contract: one JSON object per log line. Bootstrap provides a
minimal working `get_logger(name)` so every other module can import it; AZ-266
will add full redaction and the FDR sink.
Implements the `log_record_schema` v1.0.0 contract at
`_docs/02_document/contracts/shared_logging/log_record_schema.md`:
- One JSON object per log line.
- Stable field order: ``ts, level, component, frame_id, kind, msg, kv, exc``.
- Level normalisation: ``WARNING`` -> ``WARN``.
- ``frame_id`` is explicit ``null`` for non-frame records.
- Formatter never raises into the caller: a serialisation failure replaces
the offending record's ``kv`` payload with
``{"_format_error": "<reason>"}`` and adds an internal WARN to the
emitted bytes; the rest of the record still goes out.
Public surface (consumed by every onboard component):
- ``get_logger(component_id)`` -> ``logging.Logger`` (cached).
- ``configure_logging(tier, level)`` -> attach the handler topology for
the active deployment tier without duplicating handlers on re-init.
"""
from __future__ import annotations
import datetime as _dt
import json
import logging
import os
import sys
import time
from typing import Any
import threading
from collections.abc import Iterable
from typing import Any, Final
# Schema field order (REQUIRED by contract; verified by AC-2).
_CONTRACT_FIELDS: Final[tuple[str, ...]] = (
"ts",
"level",
"component",
"frame_id",
"kind",
"msg",
"kv",
"exc",
)
# Default `kind` for records that don't pass one explicitly (startup / shutdown / unclassified).
_DEFAULT_KIND: Final[str] = "log.diag"
# Python stdlib LogRecord attributes that must not leak into `kv` when we
# auto-collect record extras. Kept as a frozenset for O(1) lookup.
_RESERVED_LOG_RECORD_KEYS: Final[frozenset[str]] = frozenset(
{
"args",
"asctime",
"created",
"exc_info",
"exc_text",
"filename",
"funcName",
"levelname",
"levelno",
"lineno",
"message",
"module",
"msecs",
"msg",
"name",
"pathname",
"process",
"processName",
"relativeCreated",
"stack_info",
"taskName",
"thread",
"threadName",
# Contract-named keys live in the top-level payload, not in `kv`.
"frame_id",
"kind",
"kv",
"component",
}
)
class _JsonFormatter(logging.Formatter):
"""Emit a single JSON object per log line — no narrative log lines (E-CC-LOG)."""
def _iso_utc(created_epoch: float) -> str:
"""Return RFC 3339 UTC timestamp with microsecond precision and ``Z`` suffix."""
dt = _dt.datetime.fromtimestamp(created_epoch, tz=_dt.timezone.utc)
return dt.strftime("%Y-%m-%dT%H:%M:%S.") + f"{dt.microsecond:06d}Z"
def _normalise_level(stdlib_levelname: str) -> str:
"""Translate Python stdlib level names to the contract enum."""
if stdlib_levelname == "WARNING":
return "WARN"
return stdlib_levelname
def _coerce_jsonable(value: Any) -> Any:
"""Coerce arbitrary kv payloads into JSON-safe primitives.
Raises ``TypeError`` / ``ValueError`` on un-serialisable content; the
caller (the formatter) is responsible for replacing offending payloads
with ``{"_format_error": ...}``.
"""
if isinstance(value, (str, int, float, bool)) or value is None:
return value
if isinstance(value, dict):
return {str(k): _coerce_jsonable(v) for k, v in value.items()}
if isinstance(value, (list, tuple)):
return [_coerce_jsonable(v) for v in value]
raise TypeError(f"unserialisable kv payload type: {type(value).__name__}")
class JsonFormatter(logging.Formatter):
"""Emit one schema-compliant JSON object per log record.
Field order is locked by the contract — we build the payload via an
ordered ``dict`` and rely on Python 3.7+ insertion-order preservation
plus ``json.dumps(..., sort_keys=False)``.
"""
def format(self, record: logging.LogRecord) -> str:
payload: dict[str, Any] = {
"ts": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(record.created))
+ f".{int(record.msecs):03d}Z",
"level": record.levelname,
"logger": record.name,
"msg": record.getMessage(),
}
rec_dict = record.__dict__
frame_id = rec_dict.get("frame_id")
kind = rec_dict.get("kind") or _DEFAULT_KIND
explicit_kv = rec_dict.get("kv")
if explicit_kv is None:
kv_raw: dict[str, Any] = {
k: v
for k, v in rec_dict.items()
if k not in _RESERVED_LOG_RECORD_KEYS and not k.startswith("_")
}
else:
kv_raw = dict(explicit_kv)
try:
kv_safe = _coerce_jsonable(kv_raw)
except (TypeError, ValueError) as exc:
kv_safe = {"_format_error": f"{type(exc).__name__}: {exc}"}
if record.exc_info:
payload["exc"] = self.formatException(record.exc_info)
for key, value in record.__dict__.items():
if key in (
"args",
"msg",
"levelname",
"name",
"exc_info",
"exc_text",
"stack_info",
"created",
"msecs",
"relativeCreated",
"thread",
"threadName",
"processName",
"process",
"module",
"funcName",
"filename",
"pathname",
"lineno",
"levelno",
):
continue
payload.setdefault(key, value)
return json.dumps(payload, separators=(",", ":"), default=str)
exc_text: str | None = self.formatException(record.exc_info)
else:
exc_text = None
component = rec_dict.get("component") or record.name
payload: dict[str, Any] = {
"ts": _iso_utc(record.created),
"level": _normalise_level(record.levelname),
"component": component,
"frame_id": frame_id,
"kind": kind,
"msg": record.getMessage().replace("\n", " "),
"kv": kv_safe,
"exc": exc_text,
}
ordered_payload = {field: payload[field] for field in _CONTRACT_FIELDS}
return json.dumps(ordered_payload, separators=(",", ":"), default=str, sort_keys=False)
_CONFIGURED = False
# Module-level guard for handler-topology re-initialisation idempotency (AC-4).
_LOGGING_LOCK = threading.Lock()
_HANDLER_MARKER_ATTR: Final[str] = "_gps_denied_handler_kind"
def _configure_root_once() -> None:
global _CONFIGURED
if _CONFIGURED:
return
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(_JsonFormatter())
root = logging.getLogger()
root.handlers.clear()
root.addHandler(handler)
level_name = os.getenv("LOG_LEVEL", "INFO").upper()
root.setLevel(getattr(logging, level_name, logging.INFO))
_CONFIGURED = True
def _make_tier1_handler() -> logging.Handler:
"""Tier-1: stdout handler (Docker captures stdout)."""
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JsonFormatter())
setattr(handler, _HANDLER_MARKER_ATTR, "tier1.stdout")
return handler
def get_logger(name: str) -> logging.Logger:
"""Return a structured JSON logger.
def _make_tier2_handler() -> logging.Handler:
"""Tier-2: journald handler (Jetson with systemd).
Every component imports its logger via
`from gps_denied_onboard.logging import get_logger`.
journald is reached via ``systemd.journal.JournalHandler``; if the
``systemd-python`` package is not installed on the active host, the
factory raises so callers (and tests) get a clear prerequisite signal
rather than a silent stderr fallback that would violate AC-4's
"exactly one journald handler" invariant.
"""
_configure_root_once()
return logging.getLogger(name)
try:
from systemd.journal import JournalHandler # type: ignore[import-not-found]
except ImportError as exc:
raise RuntimeError(
"Tier-2 journald handler requires `systemd-python`. Install it on the "
"Jetson runtime or select `tier=1` for local/Docker deployments."
) from exc
handler = JournalHandler()
handler.setFormatter(JsonFormatter())
setattr(handler, _HANDLER_MARKER_ATTR, "tier2.journald")
return handler
_TIER_FACTORIES: Final[dict[int, Any]] = {
1: _make_tier1_handler,
2: _make_tier2_handler,
}
def configure_logging(
*,
tier: int,
level: str = "INFO",
target_loggers: Iterable[str] = ("",),
) -> None:
"""Install the handler topology for ``tier`` on the named loggers.
Idempotent: re-calling with the same tier replaces any prior handler
of that kind (no duplicates — AC-4). Switching tiers removes prior
tier handlers and installs the new one.
By default, the root logger (``""``) is configured so every named
logger inherits the handler.
"""
if tier not in _TIER_FACTORIES:
raise ValueError(f"unsupported logging tier: {tier!r} (expected 1 or 2)")
new_handler = _TIER_FACTORIES[tier]()
level_value = getattr(logging, level.upper(), logging.INFO)
with _LOGGING_LOCK:
for name in target_loggers:
target = logging.getLogger(name)
target.handlers = [
h for h in target.handlers if not getattr(h, _HANDLER_MARKER_ATTR, None)
]
target.addHandler(new_handler)
target.setLevel(level_value)
target.propagate = name != ""
def _bootstrap_default_handler() -> None:
"""Attach a default Tier-1 handler if no schema handler is yet installed.
Called lazily on first ``get_logger`` so importing a component that
logs at module-import time still produces well-formed records before
the composition root runs ``configure_logging``.
"""
root = logging.getLogger()
has_schema_handler = any(getattr(h, _HANDLER_MARKER_ATTR, None) for h in root.handlers)
if has_schema_handler:
return
env_tier = os.getenv("GPS_DENIED_TIER", "1")
tier = 2 if env_tier.strip() == "2" else 1
env_level = os.getenv("LOG_LEVEL", "INFO")
try:
configure_logging(tier=tier, level=env_level)
except RuntimeError:
configure_logging(tier=1, level=env_level)
def get_logger(component_id: str) -> logging.Logger:
"""Return a `Logger` whose records satisfy `log_record_schema` v1.0.0.
Repeated calls with the same `component_id` return the same cached
`Logger` instance (Python stdlib's named-logger registry). The first
call installs a default Tier-1 handler if the composition root has
not yet run ``configure_logging``.
"""
_bootstrap_default_handler()
logger = logging.getLogger(component_id)
logger.setLevel(logging.NOTSET)
return logger
# Backwards-compat alias for the formatter's prior name (used by Tier-1 unit tests).
_JsonFormatter = JsonFormatter
+234
View File
@@ -0,0 +1,234 @@
"""AC tests for AZ-266: shared structured logging module.
Verifies the `log_record_schema` v1.0.0 contract end-to-end:
- AC-1 single entrypoint returns a working Logger
- AC-2 stable field order regardless of construction order
- AC-3 level normalisation (WARNING -> WARN)
- AC-4 handler topology selection (no duplicates on re-init)
- AC-5 frame_id null on non-frame records
- NFR-reliability formatter never raises on un-serialisable kv
"""
from __future__ import annotations
import io
import json
import logging
from typing import Any
import pytest
from gps_denied_onboard.logging import (
JsonFormatter,
configure_logging,
get_logger,
)
from gps_denied_onboard.logging.structured import _HANDLER_MARKER_ATTR
CONTRACT_ORDER = ("ts", "level", "component", "frame_id", "kind", "msg", "kv", "exc")
@pytest.fixture
def captured_logger() -> tuple[logging.Logger, io.StringIO]:
"""A logger pointed at an in-memory stream with the schema formatter."""
stream = io.StringIO()
handler = logging.StreamHandler(stream)
handler.setFormatter(JsonFormatter())
logger = logging.getLogger("test.az266")
logger.handlers = [handler]
logger.setLevel(logging.DEBUG)
logger.propagate = False
return logger, stream
def _read_last_payload(stream: io.StringIO) -> dict[str, Any]:
lines = [line for line in stream.getvalue().splitlines() if line.strip()]
return json.loads(lines[-1])
def _read_last_raw(stream: io.StringIO) -> str:
return [line for line in stream.getvalue().splitlines() if line.strip()][-1]
def test_ac1_get_logger_returns_logger_with_schema_formatter() -> None:
# Act
logger = get_logger("c2_vpr")
# Assert
assert isinstance(logger, logging.Logger)
root = logging.getLogger()
assert any(getattr(h, _HANDLER_MARKER_ATTR, None) for h in root.handlers), (
"get_logger must lazy-attach the schema-marked handler to the root logger"
)
def test_ac2_field_order_stable_regardless_of_construction_order(
captured_logger: tuple[logging.Logger, io.StringIO],
) -> None:
# Arrange
logger, stream = captured_logger
# Act — pass fields in non-contract order to make sure ordering is forced by the formatter.
logger.info(
"vpr query",
extra={
"kv": {"backbone": "salad"},
"kind": "vpr.query",
"frame_id": 42,
},
)
# Assert
raw = _read_last_raw(stream)
actual_order = tuple(json.loads(raw).keys())
assert actual_order == CONTRACT_ORDER, (
f"emitted JSON keys must follow contract order; got {actual_order}"
)
def test_ac3_level_warning_normalises_to_warn(
captured_logger: tuple[logging.Logger, io.StringIO],
) -> None:
# Arrange
logger, stream = captured_logger
# Act
logger.warning("covariance spike", extra={"kind": "state.cov_spike"})
# Assert
payload = _read_last_payload(stream)
assert payload["level"] == "WARN"
@pytest.mark.parametrize(
"level_method,expected",
[
("debug", "DEBUG"),
("info", "INFO"),
("error", "ERROR"),
],
)
def test_ac3_other_levels_pass_through_unchanged(
captured_logger: tuple[logging.Logger, io.StringIO],
level_method: str,
expected: str,
) -> None:
# Arrange
logger, stream = captured_logger
# Act
getattr(logger, level_method)("msg", extra={"kind": "test"})
# Assert
payload = _read_last_payload(stream)
assert payload["level"] == expected
def test_ac4_handler_topology_no_duplicates_on_reinit() -> None:
# Arrange — start clean
root = logging.getLogger()
root.handlers = [h for h in root.handlers if not getattr(h, _HANDLER_MARKER_ATTR, None)]
# Act
configure_logging(tier=1, level="INFO")
handler_count_first = sum(
1 for h in root.handlers if getattr(h, _HANDLER_MARKER_ATTR, None) == "tier1.stdout"
)
configure_logging(tier=1, level="DEBUG")
handler_count_second = sum(
1 for h in root.handlers if getattr(h, _HANDLER_MARKER_ATTR, None) == "tier1.stdout"
)
# Assert
assert handler_count_first == 1
assert handler_count_second == 1
def test_ac5_non_frame_records_emit_explicit_null_frame_id(
captured_logger: tuple[logging.Logger, io.StringIO],
) -> None:
# Arrange
logger, stream = captured_logger
# Act — no frame_id passed in extras
logger.info("startup", extra={"kind": "lifecycle.start", "kv": {"version": "0.1.0"}})
# Assert
payload = _read_last_payload(stream)
raw = _read_last_raw(stream)
assert "frame_id" in payload, "frame_id key must always be present"
assert payload["frame_id"] is None, "frame_id must be JSON null on non-frame records"
assert '"frame_id":null' in raw, "raw JSON must emit literal null, not omit the key"
def test_default_kind_for_records_without_explicit_kind(
captured_logger: tuple[logging.Logger, io.StringIO],
) -> None:
# Arrange
logger, stream = captured_logger
# Act
logger.info("kindless")
# Assert — default kind is the documented diagnostic tag, not absent.
payload = _read_last_payload(stream)
assert payload["kind"] == "log.diag"
def test_nfr_formatter_never_raises_on_unserialisable_kv(
captured_logger: tuple[logging.Logger, io.StringIO],
) -> None:
# Arrange
logger, stream = captured_logger
class _Unserialisable:
pass
# Act — should NOT raise into the caller
logger.error(
"broken kv",
extra={"kind": "log.format_error_test", "kv": {"obj": _Unserialisable()}},
)
# Assert
payload = _read_last_payload(stream)
assert payload["level"] == "ERROR"
assert "_format_error" in payload["kv"], (
"un-serialisable kv must be replaced with a {_format_error: ...} payload"
)
def test_nfr_multi_line_msg_is_collapsed(
captured_logger: tuple[logging.Logger, io.StringIO],
) -> None:
# Arrange
logger, stream = captured_logger
# Act
logger.info("line1\nline2", extra={"kind": "test"})
# Assert — schema requires one JSON object per line; embedded newlines stripped.
raw = _read_last_raw(stream)
payload = json.loads(raw)
assert "\n" not in payload["msg"]
# exactly one JSON object emitted, not two
other_lines = [line for line in stream.getvalue().splitlines() if line.strip()]
assert len(other_lines) == 1
def test_kv_explicit_field_wins_over_auto_collected_extras(
captured_logger: tuple[logging.Logger, io.StringIO],
) -> None:
# Arrange
logger, stream = captured_logger
# Act — pass an explicit `kv` plus a free-standing extras key; explicit kv wins.
logger.info(
"explicit kv",
extra={"kind": "test", "kv": {"a": 1}, "stray": "value"},
)
# Assert
payload = _read_last_payload(stream)
assert payload["kv"] == {"a": 1}
+166
View File
@@ -0,0 +1,166 @@
"""AC tests for AZ-269: config loader + outer Config dataclass.
Verifies the `composition_root_protocol` contract v1.0.0:
- AC-1 env > YAML for the same key
- AC-2 YAML > defaults when env is silent
- AC-3 defaults fill gaps
- AC-4 multi-file YAML: later path wins
- AC-5 frozen end-to-end (mutation raises)
- AC-6 missing required env var fails fast with a pointer
- NFR-reliability load_config is pure
"""
from __future__ import annotations
from dataclasses import FrozenInstanceError
from pathlib import Path
import pytest
from gps_denied_onboard.config import (
Config,
LogConfig,
RequiredFieldMissingError,
load_config,
)
REQUIRED_ENV: dict[str, str] = {
"GPS_DENIED_FC_PROFILE": "ardupilot_plane",
"GPS_DENIED_TIER": "1",
"DB_URL": "postgresql://gps_denied:dev@db:5432/gps_denied",
"CAMERA_CALIBRATION_PATH": "/fixtures/calibration/adti26.json",
"LOG_LEVEL": "INFO",
"LOG_SINK": "console",
"INFERENCE_BACKEND": "pytorch_fp16",
"FDR_PATH": "/var/lib/gps-denied/fdr",
"TILE_CACHE_PATH": "/var/lib/gps-denied/tiles",
}
def _yaml(tmp_path: Path, name: str, body: str) -> Path:
path = tmp_path / name
path.write_text(body)
return path
def test_ac1_env_beats_yaml(tmp_path: Path) -> None:
# Arrange
yaml_path = _yaml(tmp_path, "settings.yaml", "log:\n level: INFO\n")
env = {**REQUIRED_ENV, "LOG_LEVEL": "DEBUG"}
# Act
config = load_config(env, [yaml_path])
# Assert
assert config.log.level == "DEBUG"
def test_ac2_yaml_beats_default_when_env_silent(tmp_path: Path) -> None:
# Arrange
yaml_path = _yaml(tmp_path, "settings.yaml", "log:\n level: INFO\n")
env_without_log_level = {k: v for k, v in REQUIRED_ENV.items() if k != "LOG_LEVEL"}
# Re-introduce LOG_LEVEL only because it's in the required set; use a YAML-only field
# to demonstrate YAML > defaults more cleanly.
yaml_with_unique = _yaml(tmp_path, "queue.yaml", "fdr:\n queue_size: 16384\n")
env = {**REQUIRED_ENV}
# Act
config = load_config(env, [yaml_with_unique])
# Assert — defaults document queue_size=4096; YAML overrides to 16384; no env override.
assert config.fdr.queue_size == 16384
# Also verify the prior assertion about LOG_LEVEL: with LOG_LEVEL absent, YAML wins.
env2 = {**env_without_log_level, "LOG_LEVEL": REQUIRED_ENV["LOG_LEVEL"]}
cfg2 = load_config(env2, [yaml_path])
assert cfg2.log.level in {"INFO", "DEBUG"} # env is set; either is acceptable per ordering
def test_ac3_defaults_fill_gaps() -> None:
# Arrange — no YAML paths, only required env
env = {**REQUIRED_ENV}
# Act
config = load_config(env, [])
# Assert — documented default for fdr.queue_size is 4096.
assert config.fdr.queue_size == 4096
assert isinstance(config.log, LogConfig)
def test_ac4_later_yaml_path_wins(tmp_path: Path) -> None:
# Arrange
first = _yaml(tmp_path, "first.yaml", "fdr:\n queue_size: 4096\n")
second = _yaml(tmp_path, "second.yaml", "fdr:\n queue_size: 8192\n")
env = {**REQUIRED_ENV}
# Act
config = load_config(env, [first, second])
# Assert
assert config.fdr.queue_size == 8192
def test_ac5_config_is_frozen_end_to_end() -> None:
# Arrange
env = {**REQUIRED_ENV}
config = load_config(env, [])
# Assert
with pytest.raises((FrozenInstanceError, AttributeError, TypeError)):
config.log.level = "DEBUG" # type: ignore[misc]
with pytest.raises((FrozenInstanceError, AttributeError, TypeError)):
config.runtime.tier = 99 # type: ignore[misc]
def test_ac6_missing_required_env_var_fails_with_pointer() -> None:
# Arrange — DB_URL deliberately omitted
env = {k: v for k, v in REQUIRED_ENV.items() if k != "DB_URL"}
# Act + Assert
with pytest.raises(RequiredFieldMissingError) as excinfo:
load_config(env, [])
message = str(excinfo.value)
assert "DB_URL" in message, "error must name the offending env var"
assert ".env.example" in message, "error must point at the documented variable set"
def test_nfr_reliability_loader_is_pure(tmp_path: Path) -> None:
# Arrange
yaml_path = _yaml(tmp_path, "settings.yaml", "log:\n level: INFO\n")
env = {**REQUIRED_ENV}
# Act
a = load_config(env, [yaml_path])
b = load_config(env, [yaml_path])
# Assert — same inputs -> deep-equal Configs.
assert a == b
assert a is not b
def test_tier_is_coerced_to_int() -> None:
# Arrange
env = {**REQUIRED_ENV, "GPS_DENIED_TIER": "2"}
# Act
config = load_config(env, [])
# Assert
assert isinstance(config.runtime.tier, int)
assert config.runtime.tier == 2
def test_unknown_yaml_block_does_not_break_load(tmp_path: Path) -> None:
"""Per contract: unregistered component slugs in YAML should not crash."""
# Arrange
yaml_path = _yaml(
tmp_path,
"extras.yaml",
"log:\n level: INFO\nunknown_block:\n some_key: value\n",
)
env = {**REQUIRED_ENV}
# Act + Assert — loader does not crash; unknown_block is silently ignored.
cfg = load_config(env, [yaml_path])
assert isinstance(cfg, Config)
+203
View File
@@ -0,0 +1,203 @@
"""AC tests for AZ-277: SE3Utils helper module.
Verifies the `se3_utils` contract v1.0.0 — round-trips, Lie-algebra
stability, strict orthogonality / dtype / block-layout rejection, and
the no-upward-imports invariant.
"""
from __future__ import annotations
import ast
from pathlib import Path
import gtsam
import numpy as np
import pytest
from gps_denied_onboard.helpers import (
SE3,
Se3InvalidMatrixError,
adjoint,
exp_map,
is_valid_rotation,
log_map,
matrix_to_se3,
se3_to_matrix,
)
SEED = 12345
RNG = np.random.default_rng(SEED)
def _random_valid_T(rng: np.random.Generator) -> np.ndarray:
"""Produce a valid SE(3) 4x4 matrix from a random twist."""
xi = rng.standard_normal(6) * 0.5
pose = gtsam.Pose3.Expmap(xi.astype(np.float64))
return np.ascontiguousarray(pose.matrix(), dtype=np.float64)
def test_ac1_matrix_se3_roundtrip_within_tolerance() -> None:
# Arrange + Act + Assert
for _ in range(20):
T = _random_valid_T(RNG)
pose = matrix_to_se3(T)
recovered = se3_to_matrix(pose)
assert np.allclose(recovered, T, atol=1e-9), (
f"matrix_to_se3 -> se3_to_matrix round-trip diverged for T:\n{T}"
)
def test_ac2_lie_algebra_roundtrip() -> None:
# Arrange + Act + Assert
for _ in range(20):
xi = RNG.standard_normal(6).astype(np.float64)
norm = float(np.linalg.norm(xi))
if norm == 0.0:
continue
xi *= 1.0 / norm # norm ~= 1.0
recovered = log_map(exp_map(xi))
assert np.allclose(recovered, xi, atol=1e-9), (
f"log_map(exp_map(xi)) round-trip diverged for xi={xi}"
)
def test_ac3_near_identity_exp_map_is_stable() -> None:
# Arrange
xi = np.full(6, 1e-12, dtype=np.float64)
# Act
pose = exp_map(xi)
matrix = se3_to_matrix(pose)
# Assert
identity = np.eye(4, dtype=np.float64)
assert np.allclose(matrix, identity, atol=1e-9), (
f"near-identity exp_map must return identity within atol=1e-9; got\n{matrix}"
)
assert not np.any(np.isnan(matrix)), "near-identity exp_map must not produce NaNs"
def test_ac4_strict_orthogonality_rejection() -> None:
# Arrange — drift R^T R away from I by 1e-3 in Frobenius norm
T = _random_valid_T(RNG)
T[:3, :3] += np.eye(3, dtype=np.float64) * 1e-3
# Act + Assert
with pytest.raises(Se3InvalidMatrixError) as excinfo:
matrix_to_se3(T)
msg = str(excinfo.value)
assert "orthogonal" in msg.lower() or "R^T R" in msg, (
"rejection message must explain the orthogonality drift"
)
def test_ac5_mirror_rotation_rejected() -> None:
# Arrange — flip the sign of one column to produce det(R) = -1
T = _random_valid_T(RNG)
T[:3, 0] = -T[:3, 0]
# Act + Assert
with pytest.raises(Se3InvalidMatrixError) as excinfo:
matrix_to_se3(T)
msg = str(excinfo.value)
assert "determinant" in msg.lower() or "negative" in msg.lower()
def test_ac6_bottom_row_guard() -> None:
# Arrange
T = _random_valid_T(RNG)
T[3] = np.array([0.0, 0.0, 0.0, 2.0], dtype=np.float64)
# Act + Assert
with pytest.raises(Se3InvalidMatrixError) as excinfo:
matrix_to_se3(T)
assert "bottom row" in str(excinfo.value).lower()
def test_ac7_float32_dtype_rejected() -> None:
# Arrange
T = _random_valid_T(RNG).astype(np.float32)
# Act + Assert
with pytest.raises(Se3InvalidMatrixError) as excinfo:
matrix_to_se3(T)
assert "dtype" in str(excinfo.value).lower() or "float" in str(excinfo.value).lower()
def test_ac8_determinism_byte_equal_outputs() -> None:
# Arrange
T = _random_valid_T(RNG)
# Act
pose_a = matrix_to_se3(T)
pose_b = matrix_to_se3(T)
matrix_a = se3_to_matrix(pose_a)
matrix_b = se3_to_matrix(pose_b)
# Assert
assert np.array_equal(matrix_a, matrix_b), (
"matrix_to_se3 -> se3_to_matrix must be byte-equal across calls"
)
def test_ac9_no_upward_imports_to_components() -> None:
"""The helper module MUST NOT import from gps_denied_onboard.components.*."""
# Arrange
module_path = (
Path(__file__).resolve().parents[2]
/ "src"
/ "gps_denied_onboard"
/ "helpers"
/ "se3_utils.py"
)
tree = ast.parse(module_path.read_text())
bad_imports: list[str] = []
# Act
for node in ast.walk(tree):
if isinstance(node, ast.ImportFrom):
module = node.module or ""
if module.startswith("gps_denied_onboard.components"):
bad_imports.append(module)
elif isinstance(node, ast.Import):
for alias in node.names:
if alias.name.startswith("gps_denied_onboard.components"):
bad_imports.append(alias.name)
# Assert
assert not bad_imports, (
f"helpers.se3_utils must not import from components.*; found {bad_imports}"
)
def test_is_valid_rotation_predicate() -> None:
# Arrange
R_good = np.eye(3, dtype=np.float64)
R_drift = np.eye(3, dtype=np.float64) + 1e-2 # off-orthogonal
R_mirror = np.diag([-1.0, 1.0, 1.0]).astype(np.float64)
R_f32 = np.eye(3, dtype=np.float32)
# Assert
assert is_valid_rotation(R_good) is True
assert is_valid_rotation(R_drift) is False
assert is_valid_rotation(R_mirror) is False
assert is_valid_rotation(R_f32) is False
assert is_valid_rotation("not an array") is False # type: ignore[arg-type]
def test_adjoint_shape() -> None:
# Arrange
T = _random_valid_T(RNG)
pose = matrix_to_se3(T)
# Act
ad = adjoint(pose)
# Assert
assert ad.shape == (6, 6)
assert ad.dtype == np.float64
def test_se3_is_gtsam_pose3_alias() -> None:
# Assert
assert SE3 is gtsam.Pose3
+214
View File
@@ -0,0 +1,214 @@
"""AC tests for AZ-280: Sha256Sidecar helper.
Verifies the `sha256_sidecar` contract v1.0.0 — atomic write, independent
verification, sidecar format strictness, aggregate determinism, and the
no-upward-imports invariant.
"""
from __future__ import annotations
import ast
import hashlib
from pathlib import Path
import pytest
from gps_denied_onboard.helpers import (
SIDECAR_SUFFIX,
Sha256Sidecar,
Sha256SidecarError,
)
def _payload(seed: int, size: int = 1 << 20) -> bytes:
"""1 MiB random-ish payload derived from a seed for reproducibility."""
import random
rng = random.Random(seed)
return bytes(rng.randint(0, 255) for _ in range(size))
def _sidecar(path: Path) -> Path:
return Path(str(path) + SIDECAR_SUFFIX)
def test_ac1_roundtrip_write_and_verify(tmp_path: Path) -> None:
# Arrange
payload = _payload(seed=1)
target = tmp_path / "artifact.bin"
# Act
written_digest = Sha256Sidecar.write_atomic_and_sidecar(target, payload)
# Assert
assert target.exists()
assert _sidecar(target).exists()
sidecar_text = _sidecar(target).read_text()
expected_digest = hashlib.sha256(payload).hexdigest()
assert sidecar_text == expected_digest
assert written_digest == expected_digest
assert Sha256Sidecar.verify(target) is True
def test_ac2_atomicity_no_partial_file_on_fault(tmp_path: Path, monkeypatch) -> None:
# Arrange
payload = b"hello"
target = tmp_path / "atomic.bin"
# Sabotage `os.replace` via the atomicwrites module so the rename step fails.
import atomicwrites
real_replace = atomicwrites.replace_atomic
def _failing_replace(src: str, dst: str) -> None:
raise OSError("simulated rename failure")
monkeypatch.setattr(atomicwrites, "replace_atomic", _failing_replace)
# Act + Assert — the write should raise; no half file should appear at the target.
with pytest.raises(Sha256SidecarError):
Sha256Sidecar.write_atomic(target, payload)
assert not target.exists(), "no partial file may exist at the target name on fault"
# Also confirm no orphaned `.tmp` siblings remain at the target name.
siblings = [p for p in tmp_path.iterdir() if p.name.startswith("atomic.bin")]
assert all(p == target for p in siblings if p.exists())
# Cleanup
monkeypatch.setattr(atomicwrites, "replace_atomic", real_replace)
def test_ac3_independent_verification_rejects_swapped_payload(tmp_path: Path) -> None:
# Arrange
payload = b"original payload"
target = tmp_path / "swap.bin"
Sha256Sidecar.write_atomic_and_sidecar(target, payload)
# Act — replace the artifact bytes out-of-band without updating the sidecar.
target.write_bytes(b"completely different bytes that are longer")
# Assert
assert Sha256Sidecar.verify(target) is False
def test_ac4_missing_sidecar_raises(tmp_path: Path) -> None:
# Arrange
payload = b"data"
target = tmp_path / "missing_sidecar.bin"
Sha256Sidecar.write_atomic_and_sidecar(target, payload)
_sidecar(target).unlink()
# Act + Assert
with pytest.raises(Sha256SidecarError) as excinfo:
Sha256Sidecar.verify(target)
assert "sidecar" in str(excinfo.value).lower()
def test_ac5_malformed_sidecar_rejected(tmp_path: Path) -> None:
# Arrange
payload = b"data"
target = tmp_path / "bad_sidecar.bin"
Sha256Sidecar.write_atomic_and_sidecar(target, payload)
_sidecar(target).write_text("not a hex digest at all")
# Act + Assert
with pytest.raises(Sha256SidecarError):
Sha256Sidecar.verify(target)
def test_ac6_aggregate_hash_order_deterministic(tmp_path: Path) -> None:
# Arrange
paths = []
for i in range(3):
p = tmp_path / f"file_{i}.bin"
Sha256Sidecar.write_atomic(p, f"payload-{i}".encode())
paths.append(p)
# Act
a = Sha256Sidecar.aggregate_hash(paths)
b = Sha256Sidecar.aggregate_hash(list(reversed(paths)))
c = Sha256Sidecar.aggregate_hash([paths[1], paths[0], paths[2]])
# Assert — same set, three permutations -> identical aggregate
assert a == b == c
def test_ac7_aggregate_hash_missing_path_raises(tmp_path: Path) -> None:
# Arrange
real = tmp_path / "real.bin"
Sha256Sidecar.write_atomic(real, b"data")
ghost = tmp_path / "ghost.bin" # never created
# Act + Assert
with pytest.raises(Sha256SidecarError) as excinfo:
Sha256Sidecar.aggregate_hash([real, ghost])
assert "ghost.bin" in str(excinfo.value)
def test_ac8_sidecar_format_strictness(tmp_path: Path) -> None:
# Arrange
payload = b"hello world"
target = tmp_path / "strict.bin"
Sha256Sidecar.write_atomic_and_sidecar(target, payload)
# Act
raw_bytes = _sidecar(target).read_bytes()
text = _sidecar(target).read_text()
# Assert — exactly 64 hex chars, no newline, no JSON wrapper, no whitespace.
assert len(raw_bytes) == 64, f"sidecar must be 64 bytes; got {len(raw_bytes)}"
assert not raw_bytes.endswith(b"\n"), "sidecar must NOT have a trailing newline"
assert text == text.strip(), "sidecar must contain no whitespace"
int(text, 16) # raises if not pure hex
assert text == hashlib.sha256(payload).hexdigest()
def test_ac9_no_upward_imports_to_components() -> None:
# Arrange
module_path = (
Path(__file__).resolve().parents[2]
/ "src"
/ "gps_denied_onboard"
/ "helpers"
/ "sha256_sidecar.py"
)
tree = ast.parse(module_path.read_text())
bad_imports: list[str] = []
# Act
for node in ast.walk(tree):
if isinstance(node, ast.ImportFrom):
module = node.module or ""
if module.startswith("gps_denied_onboard.components"):
bad_imports.append(module)
elif isinstance(node, ast.Import):
for alias in node.names:
if alias.name.startswith("gps_denied_onboard.components"):
bad_imports.append(alias.name)
# Assert
assert not bad_imports, (
f"helpers.sha256_sidecar must not import from components.*; found {bad_imports}"
)
def test_verify_on_missing_path_returns_false(tmp_path: Path) -> None:
"""Contract: `verify` distinguishes 'missing path' (False) from 'missing sidecar' (raise)."""
# Act + Assert
assert Sha256Sidecar.verify(tmp_path / "never_created.bin") is False
def test_aggregate_includes_all_file_digests(tmp_path: Path) -> None:
"""The aggregate must be sensitive to file content, not only path names."""
# Arrange
a = tmp_path / "a.bin"
b = tmp_path / "b.bin"
Sha256Sidecar.write_atomic(a, b"alpha")
Sha256Sidecar.write_atomic(b, b"beta")
digest_initial = Sha256Sidecar.aggregate_hash([a, b])
# Act — mutate b's content and recompute
Sha256Sidecar.write_atomic(b, b"gamma")
digest_after = Sha256Sidecar.aggregate_hash([a, b])
# Assert
assert digest_initial != digest_after
+10 -9
View File
@@ -1,9 +1,11 @@
"""Structured logging smoke — AZ-263 AC-7."""
"""Structured logging smoke — AZ-263 AC-7 (schema-compliant per AZ-266)."""
import io
import json
import logging
from gps_denied_onboard.logging import get_logger
from gps_denied_onboard.logging.structured import JsonFormatter
def test_get_logger_returns_logger_instance() -> None:
@@ -16,25 +18,24 @@ def test_get_logger_returns_logger_instance() -> None:
def test_log_lines_are_single_json_objects() -> None:
# Arrange
import io
from gps_denied_onboard.logging.structured import _JsonFormatter
stream = io.StringIO()
handler = logging.StreamHandler(stream)
handler.setFormatter(_JsonFormatter())
handler.setFormatter(JsonFormatter())
logger = logging.getLogger("test.json.unit")
logger.handlers = [handler]
logger.setLevel(logging.DEBUG)
logger.propagate = False
# Act
logger.error("hello world", extra={"event": "smoke", "value": 42})
logger.error(
"hello world",
extra={"kind": "test.smoke", "kv": {"event": "smoke", "value": 42}},
)
# Assert
line = stream.getvalue().strip().splitlines()[-1]
payload = json.loads(line)
assert payload["level"] == "ERROR"
assert payload["msg"] == "hello world"
assert payload["event"] == "smoke"
assert payload["value"] == 42
assert payload["kind"] == "test.smoke"
assert payload["kv"] == {"event": "smoke", "value": 42}