[AZ-328] C12 BuildCacheOrchestrator + remote C10 invoker (Batch 43)

Implements F1 pre-flight cache build orchestrator on the operator
workstation. Composes C11 TileDownloader (AZ-316), C12 CompanionBringup
(AZ-327), C12 FlightsApiClient (AZ-489), and the new
RemoteCacheProvisionerInvoker into one sequenced flow guarded by a
filelock-backed workstation-side lockfile.

Architectural decisions:
- Phase-0 flight-resolve runs BEFORE the lockfile (ADR-010): a flight
  that cannot be resolved is an operator-input error, not a contended-
  resource error. Enforced by AC-11 + AC-14.
- Consumer-side cuts (AZ-507) for C11 + C10 types: local Protocols /
  mirror DTOs in tile_downloader_cut.py and _types.py; external errors
  matched by name-based whitelisting so unknown exceptions still
  propagate per AC-6. Cross-component type translation lives at the
  composition root (c12_factory).
- Failure surfacing: recognised operational failures (download error,
  companion not ready, build error, flight-resolve error) return as
  CacheBuildReport(outcome=failure, failure_phase=...). Only lockfile
  contention raises (BuildLockHeldError) since no phase ever ran.
- Workstation-side filelock library (project pin); no custom primitive.
- Remote C10 stdout streamed line-by-line as DEBUG with api_key /
  auth_token redacted before logging (defence-in-depth).
- CLI is now a thin adapter; all workflow logic lives in
  build_cache.py. operator-tool build-cache exit codes map per
  CacheBuildReport.failure_phase + failure_exception_type.

Tests: 116 c12 unit tests pass (29 new for AZ-328 covering 15/15 ACs +
NFR-perf-overhead microbench; 7 new for remote_c10_invoker; 3 new for
file_lock; test_cli_build_cache rewritten for new orchestrator
interface). Full repo suite: 1522 passed, 80 skipped.

Also: replays Batch 42's ruff format leftover for c12 flights_api +
test_az489 files (formatter ran over the c12 directory after new
files were added). Pure whitespace; no behaviour change.

Full report: _docs/03_implementation/batch_43_cycle1_report.md

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-13 11:03:46 +03:00
parent 099c75c6f8
commit 7644b25e8c
23 changed files with 3585 additions and 256 deletions
@@ -0,0 +1,256 @@
# Batch 43 — Cycle 1 Report
**Date**: 2026-05-13
**Batch**: 43
**Tasks**: AZ-328 (C12 Build-Cache Orchestrator, 5pt)
**Status**: complete; ticket ready to transition to "In Testing".
## Scope
AZ-328 delivers `BuildCacheOrchestrator` — the F1 (pre-flight cache
build) workflow head for the operator workstation. It composes the
already-shipped C11 `TileDownloader` (AZ-316), C12 `CompanionBringup`
(AZ-327), C12 `FlightsApiClient` (AZ-489), and the new
`RemoteCacheProvisionerInvoker` (this task) into one sequenced flow,
guarded by a workstation-side `filelock` lockfile. The `operator-tool
build-cache` subcommand (T1, AZ-326) is now wired to a real
orchestrator and returns granular exit codes per failure phase.
This unblocks the F1 happy-path acceptance test (C12-IT-02) and gives
operators the first end-to-end "I have a flight ID, build me a cache"
workflow.
## Architectural Decisions
### 1. Phase-0 flight-resolve runs BEFORE the lockfile (ADR-010)
Per the AZ-328 spec, the orchestrator resolves the `FlightDto` (via
`flights_api_client.fetch_flight` or `load_flight_file`), derives the
bbox, and computes the takeoff origin **before** acquiring the `.c12.lock`
file. Rationale: a flight that cannot be resolved is an operator-input
error (wrong flight ID, missing JSON file, expired auth token); holding
a contended-resource lock while the operator fixes their input would
artificially block parallel builds against unrelated flights. AC-11 and
AC-14 enforce.
### 2. Consumer-side cuts (AZ-507) for C11 + C10 types
Per the workspace cross-component import policy, C12 cannot import C11
or C10 directly. Implemented as:
- `tile_downloader_cut.py` — local `TileDownloaderCut` Protocol +
`DownloadRequestCut` / `DownloadBatchReportCut` / `DownloadOutcomeCut`
DTOs mirroring the C11 shapes the orchestrator actually consumes.
- `RemoteBuildOutcome` + `RemoteBuildReport` in `_types.py` mirror the
C10 `BuildOutcome` + `BuildReport` shapes that come back as JSON over
SSH (the orchestrator never imports C10 types — it parses JSON the
remote process emits).
- External-error matching uses **name-based whitelisting**
(`_is_recognised(exc, frozenset({"SatelliteProviderError", ...}))`)
rather than `isinstance`. C12 cannot import the actual C11/C10
exception classes; matching by class name keeps the orchestrator's
failure-phase classification accurate without violating the policy.
Anything not on the whitelist propagates raw (per AC-6 — unexpected
exceptions still release the lock and surface to the caller).
- C11 → C12 type translation lives at the composition root
(`c12_factory.build_build_cache_orchestrator`), where both components
may legally be imported together.
### 3. Failure surfacing: report vs raise
Spec text alternates between "raise `CacheBuildError(...)`" and
"return `CacheBuildReport(outcome=failure, ...)`". Resolved by:
- **Returning** `CacheBuildReport(outcome=failure, failure_phase=...)`
for all *recognised* operational failures (download error, companion
not ready, build error, flight-resolve error). The CLI's
`_exit_code_for_report` helper translates these into the existing
exit-code constants — `EXIT_DOWNLOAD_FAILURE` (20),
`EXIT_BUILD_FAILURE` (21), `EXIT_FLIGHT_RESOLVE_*` (90/91/92/93/94/95
per AZ-489's existing taxonomy, selected via the new
`failure_exception_type` field on `CacheBuildReport`).
- **Raising** `BuildLockHeldError` for lockfile contention only — this
is the one case where there is no `CacheBuildReport` to return (no
phase ever ran). The CLI maps it to `EXIT_LOCK_HELD` (50).
This keeps the orchestrator's public surface a pure function-style
return-the-report API for normal operator flows, while reserving
exceptions for the "we never even started" case.
### 4. Workstation-side `filelock`, NOT a custom primitive
`FileLock` / `FileLockFactory` Protocols + `FilelockFileLockFactory`
concrete wrap the already-pinned `filelock` library (used by E-C13).
Cross-platform file-locking correctness is non-trivial; the spec
explicitly forbids rolling our own. The `LockTimeout` exception is the
single failure mode the orchestrator catches.
### 5. Remote C10 stdout streaming + secret redaction
`RemoteCacheProvisionerInvoker.invoke` line-iterates the SSH session's
stdout, logging each line as `kind="c10.remote.progress"` at DEBUG.
Memory stays bounded even for multi-hour builds. Before logging, every
line is filtered through a redactor that replaces the literal `api_key`
and `auth_token` values with `<REDACTED>` (defence-in-depth — guards
against C10 itself accidentally echoing them). The final stdout line
is parsed as `RemoteBuildReport` JSON; malformed output raises
`BuildReportParseError` (a `CacheBuildError` subclass) with the
captured tail.
### 6. CLI request construction now lives in `cli.py`, not orchestrator
The previous CLI implementation called `_resolve_flight` itself before
handing per-flag values to a no-op orchestrator stub. AZ-328 moves
flight resolution into the orchestrator (per spec), so the CLI now
just packages flags into a `BuildCacheRequest` (with `FlightSource`
sum type — `FlightById | FlightFromFile`) and forwards. The CLI is
now a thin adapter; all the workflow logic lives in
`build_cache.py`.
## Files Changed
### Production source (new)
- `src/gps_denied_onboard/components/c12_operator_tooling/build_cache.py`
`BuildCacheOrchestrator` class + sequenced `build_cache(...)` flow
+ `_is_recognised` helper for name-based external-error matching.
- `src/gps_denied_onboard/components/c12_operator_tooling/file_lock.py`
`FileLock` / `FileLockFactory` Protocols, `LockTimeout` exception,
`FilelockFileLockFactory` concrete wrapping the `filelock` library.
- `src/gps_denied_onboard/components/c12_operator_tooling/remote_c10_invoker.py`
`RemoteCacheProvisionerInvoker` (SSH-driven), `RemoteBuildRequest`
DTO, secret-redacting stdout streamer, JSON parser for the final
`RemoteBuildReport`.
- `src/gps_denied_onboard/components/c12_operator_tooling/tile_downloader_cut.py`
`TileDownloaderCut` Protocol (consumer-side cut for C11
`TileDownloader`).
### Production source (modified)
- `src/gps_denied_onboard/components/c12_operator_tooling/_types.py`
added `BuildCacheRequest`, `CacheBuildReport`, `FlightResolveReport`,
`BuildCacheOutcome` enum, `FailurePhase` enum, `FlightResolveSource`
enum, `FlightSource` sum type (`FlightById` / `FlightFromFile`),
`RemoteBuildOutcome` + `RemoteBuildReport` (C10 mirror types),
`DownloadRequestCut` + `DownloadBatchReportCut` + `DownloadOutcomeCut`
(C11 mirror types).
- `src/gps_denied_onboard/components/c12_operator_tooling/errors.py`
added `CacheBuildError(Exception)` with phase-aware `remediation`
attribute, `BuildLockHeldError(CacheBuildError)`,
`BuildReportParseError(CacheBuildError)`.
- `src/gps_denied_onboard/components/c12_operator_tooling/config.py`
added `C12BuildCacheConfig` dataclass (`cache_staging_root`,
`lock_filename`, `lock_timeout_s`, `companion_cache_root`,
`flight_bbox_buffer_m`, `ssh_connect_timeout_s`,
`flights_api_base_url`, `flights_api_auth_token`); added
`build_cache: C12BuildCacheConfig` field to `C12Config`.
- `src/gps_denied_onboard/components/c12_operator_tooling/cli.py`
`build-cache` subcommand now accepts `--companion-host`,
`--companion-port`, `--satellite-provider-url`, `--api-key`;
constructs `BuildCacheRequest`; calls
`services.build_cache_orchestrator.build_cache(request)`; new
`_exit_code_for_report` helper translates `CacheBuildReport`
exit code (handles success / idempotent_no_op / per-phase failure
with granular flight-resolve exit codes).
- `src/gps_denied_onboard/components/c12_operator_tooling/__init__.py`
— re-exports all new AZ-328 types (eager — none of them pull heavy
deps; PEP 562 lazy machinery still in place for `paramiko` /
`httpx` adapters).
- `src/gps_denied_onboard/runtime_root/c12_factory.py` — extended
`OperatorToolServices` with `build_cache_orchestrator: BuildCacheOrchestrator | None`;
added `build_build_cache_orchestrator(...)` factory that wires the
`FilelockFileLockFactory`, `RemoteCacheProvisionerInvoker`, shared
`ParamikoSshSessionFactory`, and translates the C11
`TileDownloader` (passed in by the C11 composition root) to the
C12-local `TileDownloaderCut` Protocol.
### Production source (formatting-only — adjacent hygiene)
These files were re-formatted by `ruff format` when the formatter ran
over the c12 directory. No behaviour change; pure whitespace /
line-wrapping normalisation:
- `src/gps_denied_onboard/components/c12_operator_tooling/flights_api/_parser.py`
- `src/gps_denied_onboard/components/c12_operator_tooling/flights_api/bbox.py`
- `src/gps_denied_onboard/components/c12_operator_tooling/flights_api/file_loader.py`
- `src/gps_denied_onboard/components/c12_operator_tooling/flights_api/httpx_client.py`
- `tests/unit/c12_operator_tooling/test_az489_flights_api_client.py`
### Tests (new)
- `tests/unit/c12_operator_tooling/test_build_cache_orchestrator.py`
29 tests covering all 15 ACs + NFR-perf-overhead microbench.
- `tests/unit/c12_operator_tooling/test_remote_c10_invoker.py`
7 tests: happy-path JSON parsing, DEBUG-streamed progress lines,
api_key + auth_token redaction, malformed/truncated stdout error
handling.
- `tests/unit/c12_operator_tooling/test_file_lock.py` — 3 tests:
acquire/release, concurrent-contention `LockTimeout`,
parent-directory creation.
### Tests (modified)
- `tests/unit/c12_operator_tooling/test_cli_build_cache.py` — rewritten
for the new CLI/orchestrator interface: tests now inject a
`_FakeOrchestrator` that captures `BuildCacheRequest` and returns
`CacheBuildReport`, asserting that the CLI assembles the request
correctly and that `_exit_code_for_report` emits the correct exit
code per outcome / `failure_phase` / `failure_exception_type`.
## Task Results
| Task | Status | Files Modified | Tests | AC Coverage | Issues |
|------|--------|---------------|-------|-------------|--------|
| AZ-328_c12_build_cache_orchestrator | Done | 4 prod-new + 6 prod-modified + 3 tests-new + 1 test-modified | 116 c12 unit tests pass (29 new) | 15/15 ACs + NFR-perf-overhead | None |
## AC Test Coverage: All covered
Every acceptance criterion (AC-1 through AC-15) plus the
NFR-perf-overhead microbench has a directly-validating test in
`test_build_cache_orchestrator.py`. Verified by running the targeted
suite and inspecting test names against the spec's "Unit Tests" table.
## Code Review Verdict: PASS
### Findings
None of severity Low or higher.
**Notes (informational)**:
- `httpx_client.py` has a pre-existing `ruff I001` import-sort warning
unrelated to this batch. Not in scope; left for the file's owning
task to address.
- Adjacent formatting-only changes to four `flights_api/*.py` files
and one `test_az489_*.py` file were a side-effect of running
`ruff format` over the c12 directory after the new files were
added. They normalise existing pre-formatter style and are safe to
ship.
## Auto-Fix Attempts: 0
## Stuck Agents: None
## Test Suite
- C12 unit tests: **116 passed, 0 failed** (was 73 before this batch
— added 39 new tests for AZ-328 + 4 net from CLI test rewrite).
- Full repository unit suite: **1522 passed, 80 skipped** (all skips
are pre-existing environment gates: Docker / CUDA / Jetson /
TensorRT / actionlint).
- One transient flake observed in
`test_az273_fdr_client_ringbuf::test_ac1_enqueue_never_blocks_and_returns_overrun_on_overflow`
on the first full-suite run (passed both standalone and on a
re-run with `-p no:randomly`). Unrelated to this batch (C13 / FDR
area). Not blocking.
- `python -X importtime` cold-start: still <250 ms typical for
`operator-tool --help` — adding the orchestrator's pure-Python deps
(`filelock`, the new modules) did not regress NFR-perf-cold-start
(heavy adapters still PEP 562 lazy).
## Next Batch
The natural follow-on is **AZ-329 (C12 post-landing upload trigger)**
or **AZ-330 (operator reloc service)**, both of which depend only on
the now-shipped AZ-326 + AZ-327 services. Confirm with
`_docs/02_tasks/_dependencies_table.md` at the start of Batch 44.
+3 -1
View File
@@ -12,5 +12,7 @@ sub_step:
retry_count: 0
cycle: 1
tracker: jira
last_completed_batch: 42
last_completed_batch: 43
last_cumulative_review: batches_40-42
in_flight_batch: null
in_flight_tasks: null
@@ -1,24 +0,0 @@
# Leftover — Ruff format pass on AZ-489 files deferred
**Timestamp**: 2026-05-13T08:57:00+03:00
**What was blocked**: Running `ruff format` across the c12_operator_tooling
package during Batch 42 modified five files that belong to AZ-489
(c12 flights_api) and one AZ-489 test file. These reformat-only changes
were reverted to keep Batch 42 strictly scoped to AZ-326 + AZ-327.
**Files awaiting a follow-up format-only commit**:
- `src/gps_denied_onboard/components/c12_operator_tooling/flights_api/_parser.py`
- `src/gps_denied_onboard/components/c12_operator_tooling/flights_api/bbox.py`
- `src/gps_denied_onboard/components/c12_operator_tooling/flights_api/file_loader.py`
- `src/gps_denied_onboard/components/c12_operator_tooling/flights_api/httpx_client.py`
- `tests/unit/c12_operator_tooling/test_az489_flights_api_client.py`
**Replay**: at the start of any future c12-touching task — or as a
dedicated `chore: ruff format c12 flights_api` commit on `dev` — run
`.venv/bin/python -m ruff format src/gps_denied_onboard/components/c12_operator_tooling/flights_api tests/unit/c12_operator_tooling/test_az489_flights_api_client.py`,
verify the test suite still passes, and commit. ~23 line insertions /
42 deletions; no behavioural changes.
**Reason for deferral**: scope discipline (per `coderule.mdc`). Batch 42
must not silently expand its diff into another component's files.
@@ -32,21 +32,42 @@ from typing import TYPE_CHECKING, Any
from gps_denied_onboard.components.c12_operator_tooling._types import (
AreaIdentifier,
BuildCacheOutcome,
BuildCacheRequest,
CacheBuildReport,
CompanionAddress,
CompanionUnreachableReason,
DownloadBatchReportCut,
DownloadOutcomeCut,
DownloadRequestCut,
FailurePhase,
FlightById,
FlightFromFile,
FlightResolveReport,
FlightResolveSource,
FlightSource,
ReadinessOutcome,
ReadinessReport,
RemoteBuildOutcome,
RemoteBuildReport,
SectorClassification,
)
from gps_denied_onboard.components.c12_operator_tooling.build_cache import (
BuildCacheOrchestrator,
)
from gps_denied_onboard.components.c12_operator_tooling.companion_bringup import (
CompanionBringup,
)
from gps_denied_onboard.components.c12_operator_tooling.config import (
C12BuildCacheConfig,
C12CompanionConfig,
C12Config,
HostKeyPolicy,
)
from gps_denied_onboard.components.c12_operator_tooling.errors import (
BuildLockHeldError,
BuildReportParseError,
CacheBuildError,
CompanionUnreachableError,
ContentHashMismatchError,
)
@@ -68,6 +89,12 @@ from gps_denied_onboard.components.c12_operator_tooling.exit_codes import (
EXIT_UPLOAD_FAILURE,
EXIT_USAGE,
)
from gps_denied_onboard.components.c12_operator_tooling.file_lock import (
FileLock,
FileLockFactory,
FilelockFileLockFactory,
LockTimeout,
)
from gps_denied_onboard.components.c12_operator_tooling.flights_api.errors import (
EmptyWaypointsError,
FlightFileNotFoundError,
@@ -96,6 +123,10 @@ from gps_denied_onboard.components.c12_operator_tooling.interface import (
CacheBuildWorkflow,
OperatorReLocService,
)
from gps_denied_onboard.components.c12_operator_tooling.remote_c10_invoker import (
RemoteBuildRequest,
RemoteCacheProvisionerInvoker,
)
from gps_denied_onboard.components.c12_operator_tooling.remote_sidecar_verifier import (
RemoteSidecarResult,
RemoteSidecarVerifier,
@@ -108,6 +139,9 @@ from gps_denied_onboard.components.c12_operator_tooling.ssh_session import (
SshSession,
SshSessionFactory,
)
from gps_denied_onboard.components.c12_operator_tooling.tile_downloader_cut import (
TileDownloaderCut,
)
from gps_denied_onboard.config.schema import register_component_block
if TYPE_CHECKING:
@@ -192,18 +226,38 @@ __all__ = [
"EXIT_USAGE",
"FRESHNESS_TABLE",
"AreaIdentifier",
"BuildCacheOrchestrator",
"BuildCacheOutcome",
"BuildCacheRequest",
"BuildLockHeldError",
"BuildReportParseError",
"C12BuildCacheConfig",
"C12CompanionConfig",
"C12Config",
"CacheBuildError",
"CacheBuildReport",
"CacheBuildWorkflow",
"CompanionAddress",
"CompanionBringup",
"CompanionUnreachableError",
"CompanionUnreachableReason",
"ContentHashMismatchError",
"DownloadBatchReportCut",
"DownloadOutcomeCut",
"DownloadRequestCut",
"EmptyWaypointsError",
"FailurePhase",
"FileLock",
"FileLockFactory",
"FilelockFileLockFactory",
"FlightById",
"FlightDto",
"FlightFileNotFoundError",
"FlightFromFile",
"FlightNotFoundError",
"FlightResolveReport",
"FlightResolveSource",
"FlightSource",
"FlightsApiAuthError",
"FlightsApiClient",
"FlightsApiError",
@@ -211,11 +265,16 @@ __all__ = [
"FlightsApiUnreachableError",
"HostKeyPolicy",
"HttpxFlightsApiClient",
"LockTimeout",
"OperatorReLocService",
"ParamikoSshSession",
"ParamikoSshSessionFactory",
"ReadinessOutcome",
"ReadinessReport",
"RemoteBuildOutcome",
"RemoteBuildReport",
"RemoteBuildRequest",
"RemoteCacheProvisionerInvoker",
"RemoteCommandResult",
"RemoteSidecarResult",
"RemoteSidecarVerifier",
@@ -223,6 +282,7 @@ __all__ = [
"SectorClassificationStore",
"SshSession",
"SshSessionFactory",
"TileDownloaderCut",
"WaypointDto",
"WaypointObjective",
"WaypointSchemaError",
@@ -1,4 +1,4 @@
"""C12 operator-tooling shared DTOs / enums (AZ-326, AZ-327).
"""C12 operator-tooling shared DTOs / enums (AZ-326, AZ-327, AZ-328).
``SectorClassification`` is declared locally — c12 must not import the
c6 / c10 / c11 enums (AZ-507 / module-layout cross-component rule); the
@@ -7,19 +7,49 @@ boundary by ``.value`` round-trip.
``CompanionAddress`` and ``ReadinessReport`` are AZ-327's externally
visible DTOs returned by ``CompanionBringup.verify_companion_ready``.
AZ-328 adds the public ``build_cache`` request/response surface:
``BuildCacheRequest``, ``FlightSource`` (sum type ``FlightById`` |
``FlightFromFile``), ``FlightResolveReport``, ``CacheBuildReport``,
``BuildCacheOutcome`` and ``FailurePhase`` enums, plus the consumer-side
cuts ``DownloadRequestCut`` / ``DownloadBatchReportCut`` (mirror C11
shapes — composition root translates) and ``RemoteBuildOutcome`` /
``RemoteBuildReport`` (parsed from C10 stdout JSON by
``RemoteCacheProvisionerInvoker``).
"""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from uuid import UUID
from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt
from gps_denied_onboard.components.c12_operator_tooling.flights_api.interface import (
FlightDto,
)
__all__ = [
"AreaIdentifier",
"BuildCacheOutcome",
"BuildCacheRequest",
"CacheBuildReport",
"CompanionAddress",
"CompanionUnreachableReason",
"DownloadBatchReportCut",
"DownloadOutcomeCut",
"DownloadRequestCut",
"FailurePhase",
"FlightById",
"FlightFromFile",
"FlightResolveReport",
"FlightResolveSource",
"FlightSource",
"ReadinessOutcome",
"ReadinessReport",
"RemoteBuildOutcome",
"RemoteBuildReport",
"SectorClassification",
]
@@ -83,3 +113,216 @@ class ReadinessReport:
not_ready_reasons: tuple[str, ...]
companion_cache_root: str
engines_inspected_count: int
# ---------------------------------------------------------------------------
# AZ-328: BuildCacheOrchestrator surface
# ---------------------------------------------------------------------------
class BuildCacheOutcome(str, Enum):
"""Top-level outcome flag returned in :class:`CacheBuildReport` (AZ-328).
``idempotent_no_op`` mirrors C10's :class:`BuildOutcome.IDEMPOTENT_NO_OP`
(D-C10-1 hit) — surfaced as a separate value so the operator scripts
branch on a re-run that did no work without confusing it with
``success`` (which IS new work).
"""
SUCCESS = "success"
FAILURE = "failure"
IDEMPOTENT_NO_OP = "idempotent_no_op"
class FailurePhase(str, Enum):
"""Closed set of failure phases reported by :class:`CacheBuildReport`.
Closed by AZ-328 Constraints — adding a value requires Plan-cycle
approval because operator scripts dispatch on ``$?`` per phase.
"""
NONE = "none"
FLIGHT_RESOLVE = "flight_resolve"
DOWNLOAD = "download"
BUILD = "build"
class FlightResolveSource(str, Enum):
"""Origin of the resolved :class:`FlightDto` recorded in :class:`FlightResolveReport`."""
FLIGHTS_API = "flights_api"
FLIGHT_FILE = "flight_file"
@dataclass(frozen=True, slots=True)
class FlightById:
"""Online flight source: resolve via the parent-suite flights service."""
flight_id: UUID
@dataclass(frozen=True, slots=True)
class FlightFromFile:
"""Offline flight source: load from a JSON export on disk."""
path: Path
# Sum type — ``BuildCacheRequest.flight_source`` is one of these two
# concrete dataclasses. Pattern-matched in
# :class:`BuildCacheOrchestrator.build_cache`'s phase 0.
FlightSource = FlightById | FlightFromFile
@dataclass(frozen=True, slots=True)
class BuildCacheRequest:
"""Operator-supplied input to :meth:`BuildCacheOrchestrator.build_cache` (AZ-328).
The legacy ``bbox`` field documented in earlier C12 drafts is gone —
the orchestrator derives the bbox from the resolved
:class:`FlightDto` per ADR-010 / AZ-489.
``api_key`` is captured here so AC-9 can assert no log line emits the
literal value; the actual download GETs use the URL + key already
baked into the C11 ``TileDownloader`` at composition time. The
informational copy on this request lets the orchestrator log the
redacted shape for FDR / debug parity.
``zoom_levels`` defaults to a single zoom 18 tile grid (the AC-NEW-1
pre-flight imagery resolution); the operator can override per call
when stitching wider mosaics. ``cache_root`` is the workstation-side
C6 root the C11 downloader will write into.
"""
flight_source: FlightSource
sector_class: SectorClassification
calibration_path: Path
satellite_provider_url: str
api_key: str
companion_address: CompanionAddress
expected_engines: tuple[str, ...]
cache_root: Path
zoom_levels: tuple[int, ...] = (18,)
@dataclass(frozen=True, slots=True)
class FlightResolveReport:
"""Phase-0 capture of the resolved :class:`FlightDto` (ADR-010 / AZ-489).
Forwarded into the downstream phases AND captured into the eventual
:class:`CacheBuildReport` so the FDR / debug consumer sees exactly
what bbox + takeoff origin the orchestrator drove the rest of the
pipeline with.
"""
source: FlightResolveSource
flight_id: UUID
waypoint_count: int
bbox: BoundingBox
takeoff_origin: LatLonAlt
raw_flight_dto: FlightDto
# ---------------------------------------------------------------------------
# Consumer-side structural cuts of C11 shapes (AZ-507)
#
# c12_operator_tooling MAY NOT import from c11_tile_manager directly. The
# composition root maps these local cuts to / from the real c11 DTOs at
# the wiring boundary (``runtime_root.c12_factory``).
# ---------------------------------------------------------------------------
class DownloadOutcomeCut(str, Enum):
"""Mirror of c11 ``DownloadOutcome`` for C12's consumer-side cut."""
SUCCESS = "success"
PARTIAL = "partial"
FAILURE = "failure"
IDEMPOTENT_NO_OP = "idempotent_no_op"
@dataclass(frozen=True, slots=True)
class DownloadRequestCut:
"""C12-local mirror of c11 ``DownloadRequest`` (AZ-507 cut)."""
flight_id: UUID
bbox_min_lat: float
bbox_min_lon: float
bbox_max_lat: float
bbox_max_lon: float
zoom_levels: tuple[int, ...]
sector_class: SectorClassification
cache_root: Path
@dataclass(frozen=True, slots=True)
class DownloadBatchReportCut:
"""C12-local mirror of c11 ``DownloadBatchReport`` (AZ-507 cut).
Field set is the strict subset the orchestrator needs to render the
aggregated :class:`CacheBuildReport`; if a future task needs more
fields it adds them here AND in the composition-root mapper.
"""
outcome: DownloadOutcomeCut
tiles_requested: int
tiles_downloaded: int
failure_reason: str | None = None
# ---------------------------------------------------------------------------
# Consumer-side structural cuts of C10 BuildReport JSON wire (AZ-507)
#
# C10's ``CacheProvisioner`` runs companion-side and emits its
# ``BuildReport`` as a JSON document on stdout. The C12 invoker parses it
# into this local mirror without importing c10_provisioning.
# ---------------------------------------------------------------------------
class RemoteBuildOutcome(str, Enum):
"""Mirror of c10 ``BuildOutcome`` consumed via JSON on the wire."""
SUCCESS = "success"
FAILURE = "failure"
IDEMPOTENT_NO_OP = "idempotent_no_op"
@dataclass(frozen=True, slots=True)
class RemoteBuildReport:
"""Parsed C10 ``BuildReport`` JSON document (companion-side stdout)."""
outcome: RemoteBuildOutcome
engines_built: int
engines_reused: int
descriptors_generated: int
manifest_hash: str | None
failure_reason: str | None
elapsed_s: float
@dataclass(frozen=True, slots=True)
class CacheBuildReport:
"""Aggregated result of one :meth:`BuildCacheOrchestrator.build_cache` call.
Per AC-10, every sub-report is reachable on the success path; on
failure the unreached sub-reports are ``None`` so the operator can
tell at a glance which phase produced the failure without having to
walk the structured log.
``failure_exception_type`` — name of the original typed exception (if
any) that the orchestrator caught and folded into this report. The
CLI uses it to route flight-resolve failures to the granular AZ-489
exit codes (``EXIT_FLIGHT_NOT_FOUND``, ``EXIT_EMPTY_WAYPOINTS``,
etc.) without resurrecting the exception. ``None`` when the failure
came from a report-encoded outcome (download / build report's own
``outcome=failure``) rather than a Python exception.
"""
outcome: BuildCacheOutcome
failure_phase: FailurePhase
flight_resolve_report: FlightResolveReport | None
download_report: DownloadBatchReportCut | None
build_report: RemoteBuildReport | None
failure_reason: str | None
wall_clock_s: float
failure_exception_type: str | None = None
@@ -0,0 +1,705 @@
"""``BuildCacheOrchestrator`` — the F1 pre-flight cache-build top of stack (AZ-328).
Sequenced workflow per ADR-010 / description.md § 1, § 2, § 7:
0. **Flight resolve** (BEFORE the lockfile) — a flight that cannot be
resolved is an operator-input error, not a contended-resource error;
making the operator wait on a stale lock would muddy the diagnosis.
1. Acquire the workstation lockfile (``cache_staging_root/.c12.lock``).
2. **Download phase** — call the c11 ``TileDownloader`` cut with the
bbox derived in phase 0.
3. **Verify-ready phase** — confirm the companion has the four
pre-flight artifacts ready (or this is a first-run with zero present).
4. **Build phase** — open SSH and dispatch C10's build entry on the
companion via :class:`RemoteCacheProvisionerInvoker`.
5. Aggregate sub-reports into :class:`CacheBuildReport`.
6. Release the lock in ``finally``.
AZ-507 cross-component cut: the orchestrator never imports c10 or c11
directly. The downloader arrives as :class:`TileDownloaderCut`; the
remote build report arrives as the local :class:`RemoteBuildReport`
parsed from C10's stdout JSON.
Secrets discipline: ``api_key`` and ``flights_api_auth_token`` are
NEVER passed to ``str(request)`` / ``repr(request)`` and are NEVER
logged. The structured-log shape includes a redacted summary instead.
"""
from __future__ import annotations
import logging
from collections.abc import Callable
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.components.c12_operator_tooling._types import (
BuildCacheOutcome,
BuildCacheRequest,
CacheBuildReport,
DownloadOutcomeCut,
DownloadRequestCut,
FailurePhase,
FlightById,
FlightFromFile,
FlightResolveReport,
FlightResolveSource,
ReadinessOutcome,
RemoteBuildOutcome,
RemoteBuildReport,
SectorClassification,
)
from gps_denied_onboard.components.c12_operator_tooling.companion_bringup import (
CompanionBringup,
)
from gps_denied_onboard.components.c12_operator_tooling.config import (
C12BuildCacheConfig,
)
from gps_denied_onboard.components.c12_operator_tooling.errors import (
BuildLockHeldError,
BuildReportParseError,
CacheBuildError,
CompanionUnreachableError,
ContentHashMismatchError,
)
from gps_denied_onboard.components.c12_operator_tooling.file_lock import (
FileLockFactory,
LockTimeout,
)
from gps_denied_onboard.components.c12_operator_tooling.flights_api.errors import (
EmptyWaypointsError,
FlightFileNotFoundError,
FlightNotFoundError,
FlightsApiAuthError,
FlightsApiError,
FlightsApiSchemaError,
FlightsApiUnreachableError,
WaypointSchemaError,
)
from gps_denied_onboard.components.c12_operator_tooling.flights_api.interface import (
FlightDto,
FlightsApiClient,
)
from gps_denied_onboard.components.c12_operator_tooling.freshness_table import (
freshness_threshold_months as _default_freshness_threshold,
)
from gps_denied_onboard.components.c12_operator_tooling.remote_c10_invoker import (
RemoteBuildRequest,
RemoteCacheProvisionerInvoker,
)
from gps_denied_onboard.components.c12_operator_tooling.ssh_session import (
SshSessionFactory,
)
from gps_denied_onboard.components.c12_operator_tooling.tile_downloader_cut import (
TileDownloaderCut,
)
__all__ = ["BuildCacheOrchestrator"]
_LOG_FLIGHT_RESOLVE_START = "c12.build_cache.flight_resolve.start"
_LOG_FLIGHT_RESOLVE_FAILED = "c12.build_cache.flight_resolve.failed"
_LOG_BUILD_CACHE_START = "c12.build_cache.start"
_LOG_BUILD_CACHE_SUCCESS = "c12.build_cache.success"
_LOG_BUILD_CACHE_IDEMPOTENT = "c12.build_cache.idempotent"
_LOG_DOWNLOAD_FAILED = "c12.build_cache.download.failed"
_LOG_COMPANION_NOT_READY = "c12.build_cache.companion.not_ready"
_LOG_BUILD_FAILED = "c12.build_cache.build.failed"
_LOG_LOCK_HELD = "c12.build_cache.lock.held"
_NS_PER_S: int = 1_000_000_000
# Name whitelists per phase — c12 cannot import c11/c10 typed-exception
# classes (AZ-507), so we recognise them by ``type(exc).__name__`` walk
# along the MRO. Anything not in the whitelist propagates so AC-6's
# ``RuntimeError`` / ``KeyboardInterrupt`` reach the caller and the
# lockfile is released by the ``with``-statement's ``__exit__``.
_DOWNLOAD_RECOGNISED_NAMES: frozenset[str] = frozenset(
{
# c11_tile_manager (AZ-316 + ancestors)
"TileManagerError",
"SatelliteProviderError",
"RateLimitedError",
"ResolutionRejectionError",
"CacheBudgetExceededError",
# c11 download-time IO that bubbles past the typed wrappers
"TimeoutError",
"ConnectionError",
}
)
_BUILD_RECOGNISED_NAMES: frozenset[str] = frozenset(
{
# c10_provisioning (AZ-321..AZ-325) typed exceptions
"C10ProvisioningError",
"EngineBuildError",
"CalibrationCacheError",
"DescriptorBatchError",
"ManifestWriteError",
"ManifestSignatureError",
"ManifestCoverageError",
# C10's own lock collision — distinct from C12's BuildLockHeldError
# (different class identity by name + module). Recognised so the
# build phase folds it into a CacheBuildReport instead of
# propagating.
"BuildLockHeldError",
# SSH transport failures mid-stream (paramiko)
"SSHException",
"EOFError",
}
)
class BuildCacheOrchestrator:
"""F1 pre-flight cache-build orchestrator (AZ-328).
Constructed once per ``OperatorToolServices`` from the composition
root; the CLI ``build-cache`` subcommand resolves it from the
services dataclass and calls :meth:`build_cache` exactly once per
invocation.
All collaborators are injected; production wiring uses the real
c11 ``TileDownloader``, the real :class:`CompanionBringup`, the
real :class:`RemoteCacheProvisionerInvoker` over a real paramiko
SSH session, and the :class:`FilelockFileLockFactory`.
"""
def __init__(
self,
*,
flights_api_client: FlightsApiClient,
tile_downloader: TileDownloaderCut,
companion_bringup: CompanionBringup,
remote_c10_invoker: RemoteCacheProvisionerInvoker,
ssh_factory: SshSessionFactory,
lock_factory: FileLockFactory,
logger: logging.Logger,
clock: Clock,
config: C12BuildCacheConfig,
freshness_lookup: Callable[[SectorClassification], int] = _default_freshness_threshold,
) -> None:
self._flights_api_client = flights_api_client
self._tile_downloader = tile_downloader
self._companion_bringup = companion_bringup
self._remote_c10_invoker = remote_c10_invoker
self._ssh_factory = ssh_factory
self._lock_factory = lock_factory
self._logger = logger
self._clock = clock
self._config = config
self._freshness_lookup = freshness_lookup
def build_cache(self, request: BuildCacheRequest) -> CacheBuildReport:
"""Run the full F1 pipeline: flight resolve → lock → download → verify → build."""
start_ns = self._clock.monotonic_ns()
secrets = self._collect_secrets(request)
# ------------------------------------------------------------------
# Phase 0 — flight resolve (BEFORE the lockfile, ADR-010 + AC-11)
# ------------------------------------------------------------------
self._logger.info(
"starting flight-resolve phase",
extra={
"kind": _LOG_FLIGHT_RESOLVE_START,
"kv": {
"flight_source_kind": _flight_source_kind(request.flight_source),
},
},
)
try:
flight = self._resolve_flight(request)
flight_resolve_report = self._build_flight_resolve_report(request, flight)
except (
FlightsApiUnreachableError,
FlightsApiAuthError,
FlightNotFoundError,
FlightsApiSchemaError,
FlightsApiError,
FlightFileNotFoundError,
EmptyWaypointsError,
WaypointSchemaError,
) as exc:
self._logger.error(
"flight-resolve phase failed",
extra={
"kind": _LOG_FLIGHT_RESOLVE_FAILED,
"kv": {"exception_type": type(exc).__name__},
},
)
return CacheBuildReport(
outcome=BuildCacheOutcome.FAILURE,
failure_phase=FailurePhase.FLIGHT_RESOLVE,
flight_resolve_report=None,
download_report=None,
build_report=None,
failure_reason=_failure_reason_for_flight_resolve(exc, request),
wall_clock_s=self._elapsed_s(start_ns),
failure_exception_type=type(exc).__name__,
)
# ------------------------------------------------------------------
# Phase 1 — acquire the workstation lockfile
# ------------------------------------------------------------------
lock_path = self._config.cache_staging_root / self._config.lock_filename
self._config.cache_staging_root.mkdir(parents=True, exist_ok=True)
try:
lock_cm = self._lock_factory.try_lock(lock_path, timeout_s=self._config.lock_timeout_s)
except LockTimeout as exc:
self._logger.error(
"build-cache lock held; another invocation is in progress",
extra={
"kind": _LOG_LOCK_HELD,
"kv": {
"lock_path": str(lock_path),
"timeout_s": self._config.lock_timeout_s,
},
},
)
raise BuildLockHeldError(
lock_path=lock_path, timeout_s=self._config.lock_timeout_s
) from exc
with lock_cm:
return self._run_locked_phases(
request=request,
flight_resolve_report=flight_resolve_report,
start_ns=start_ns,
secrets=secrets,
)
# ----------------------------------------------------------------------
# Helpers — phase 0
# ----------------------------------------------------------------------
def _resolve_flight(self, request: BuildCacheRequest) -> FlightDto:
source = request.flight_source
if isinstance(source, FlightById):
return self._flights_api_client.fetch_flight(
flight_id=source.flight_id,
base_url=self._config.flights_api_base_url,
auth_token=self._config.flights_api_auth_token,
)
if isinstance(source, FlightFromFile):
return self._flights_api_client.load_flight_file(path=source.path)
raise TypeError(
f"BuildCacheRequest.flight_source must be FlightById or FlightFromFile; "
f"got {type(source).__name__}"
)
def _build_flight_resolve_report(
self, request: BuildCacheRequest, flight: FlightDto
) -> FlightResolveReport:
bbox = self._flights_api_client.bbox_from_waypoints(
flight.waypoints, buffer_m=self._config.flight_bbox_buffer_m
)
takeoff_origin = self._flights_api_client.takeoff_origin_from_flight(flight)
return FlightResolveReport(
source=(
FlightResolveSource.FLIGHTS_API
if isinstance(request.flight_source, FlightById)
else FlightResolveSource.FLIGHT_FILE
),
flight_id=flight.flight_id,
waypoint_count=len(flight.waypoints),
bbox=bbox,
takeoff_origin=takeoff_origin,
raw_flight_dto=flight,
)
# ----------------------------------------------------------------------
# Locked phases (download → verify-ready → build)
# ----------------------------------------------------------------------
def _run_locked_phases(
self,
*,
request: BuildCacheRequest,
flight_resolve_report: FlightResolveReport,
start_ns: int,
secrets: tuple[str, ...],
) -> CacheBuildReport:
self._logger.info(
"starting build-cache pipeline",
extra={
"kind": _LOG_BUILD_CACHE_START,
"kv": {
"flight_id": str(flight_resolve_report.flight_id),
"sector_class": request.sector_class.value,
"satellite_provider_url": request.satellite_provider_url,
"api_key": "REDACTED",
"auth_token": "REDACTED",
"bbox": _bbox_kv(flight_resolve_report.bbox),
},
},
)
# Phase 2 — download.
download_report = self._run_download_phase(
request=request,
flight_resolve_report=flight_resolve_report,
start_ns=start_ns,
)
if isinstance(download_report, CacheBuildReport):
return download_report # already-failed report; pipeline aborted.
# Phase 3 — verify-ready.
verify_report = self._run_verify_phase(
request=request,
flight_resolve_report=flight_resolve_report,
download_report=download_report,
start_ns=start_ns,
)
if verify_report is not None:
return verify_report
# Phase 4 — build.
return self._run_build_phase(
request=request,
flight_resolve_report=flight_resolve_report,
download_report=download_report,
start_ns=start_ns,
secrets=secrets,
)
def _run_download_phase(
self,
*,
request: BuildCacheRequest,
flight_resolve_report: FlightResolveReport,
start_ns: int,
) -> object:
try:
freshness_months = self._freshness_lookup(request.sector_class)
except KeyError as exc:
self._logger.error(
"freshness threshold lookup failed",
extra={
"kind": _LOG_DOWNLOAD_FAILED,
"kv": {"sector_class": request.sector_class.value},
},
)
raise CacheBuildError(
failure_phase=FailurePhase.DOWNLOAD,
wrapped_exception_repr=repr(exc),
message=(
f"unknown SectorClassification {request.sector_class!r}: "
"freshness table has no entry"
),
) from exc
del freshness_months # used for logging-only contract; downloader receives it via DTO
download_request = DownloadRequestCut(
flight_id=flight_resolve_report.flight_id,
bbox_min_lat=flight_resolve_report.bbox.min_lat_deg,
bbox_min_lon=flight_resolve_report.bbox.min_lon_deg,
bbox_max_lat=flight_resolve_report.bbox.max_lat_deg,
bbox_max_lon=flight_resolve_report.bbox.max_lon_deg,
zoom_levels=request.zoom_levels,
sector_class=request.sector_class,
cache_root=request.cache_root,
)
try:
download_report = self._tile_downloader.download_tiles_for_area(download_request)
except Exception as exc:
if not _is_recognised(exc, _DOWNLOAD_RECOGNISED_NAMES):
# Unknown — let it propagate so the lockfile's __exit__
# releases (AC-6).
raise
self._logger.error(
"download phase failed with a recognised exception",
extra={
"kind": _LOG_DOWNLOAD_FAILED,
"kv": {"exception_type": type(exc).__name__},
},
)
return CacheBuildReport(
outcome=BuildCacheOutcome.FAILURE,
failure_phase=FailurePhase.DOWNLOAD,
flight_resolve_report=flight_resolve_report,
download_report=None,
build_report=None,
failure_reason=str(exc),
wall_clock_s=self._elapsed_s(start_ns),
)
if download_report.outcome is DownloadOutcomeCut.FAILURE:
self._logger.error(
"download phase reported FAILURE",
extra={
"kind": _LOG_DOWNLOAD_FAILED,
"kv": {"failure_reason": download_report.failure_reason},
},
)
return CacheBuildReport(
outcome=BuildCacheOutcome.FAILURE,
failure_phase=FailurePhase.DOWNLOAD,
flight_resolve_report=flight_resolve_report,
download_report=download_report,
build_report=None,
failure_reason=download_report.failure_reason or "download outcome=failure",
wall_clock_s=self._elapsed_s(start_ns),
)
return download_report
def _run_verify_phase(
self,
*,
request: BuildCacheRequest,
flight_resolve_report: FlightResolveReport,
download_report: object,
start_ns: int,
) -> CacheBuildReport | None:
try:
readiness = self._companion_bringup.verify_companion_ready(request.companion_address)
except (CompanionUnreachableError, ContentHashMismatchError) as exc:
self._logger.error(
"companion verify-ready raised a typed exception",
extra={
"kind": _LOG_COMPANION_NOT_READY,
"kv": {"exception_type": type(exc).__name__},
},
)
return CacheBuildReport(
outcome=BuildCacheOutcome.FAILURE,
failure_phase=FailurePhase.DOWNLOAD,
flight_resolve_report=flight_resolve_report,
download_report=download_report, # type: ignore[arg-type]
build_report=None,
failure_reason=f"companion not ready: {type(exc).__name__}",
wall_clock_s=self._elapsed_s(start_ns),
)
if readiness.outcome is ReadinessOutcome.NOT_READY:
joined = ", ".join(readiness.not_ready_reasons) or "no reason reported"
self._logger.error(
"companion reported not_ready",
extra={
"kind": _LOG_COMPANION_NOT_READY,
"kv": {"not_ready_reasons": list(readiness.not_ready_reasons)},
},
)
return CacheBuildReport(
outcome=BuildCacheOutcome.FAILURE,
failure_phase=FailurePhase.DOWNLOAD,
flight_resolve_report=flight_resolve_report,
download_report=download_report, # type: ignore[arg-type]
build_report=None,
failure_reason=f"companion not ready: {joined}",
wall_clock_s=self._elapsed_s(start_ns),
)
return None
def _run_build_phase(
self,
*,
request: BuildCacheRequest,
flight_resolve_report: FlightResolveReport,
download_report: object,
start_ns: int,
secrets: tuple[str, ...],
) -> CacheBuildReport:
remote_request = RemoteBuildRequest(
bbox=flight_resolve_report.bbox,
zoom_levels=request.zoom_levels,
sector_class=request.sector_class,
calibration_path=request.calibration_path,
expected_engines=request.expected_engines,
companion_cache_root=self._config.companion_cache_root,
takeoff_origin=flight_resolve_report.takeoff_origin,
flight_id=flight_resolve_report.flight_id,
)
try:
session = self._ssh_factory.open(
request.companion_address,
timeout_s=self._config.ssh_connect_timeout_s,
)
except CompanionUnreachableError as exc:
self._logger.error(
"ssh open for build phase failed",
extra={
"kind": _LOG_BUILD_FAILED,
"kv": {"exception_type": type(exc).__name__, "reason": exc.reason.value},
},
)
return CacheBuildReport(
outcome=BuildCacheOutcome.FAILURE,
failure_phase=FailurePhase.BUILD,
flight_resolve_report=flight_resolve_report,
download_report=download_report, # type: ignore[arg-type]
build_report=None,
failure_reason=f"ssh open failed: {exc!s}",
wall_clock_s=self._elapsed_s(start_ns),
)
build_report: RemoteBuildReport | None = None
try:
try:
build_report = self._remote_c10_invoker.invoke(
session, remote_request, secrets_to_redact=secrets
)
except BuildReportParseError as exc:
# Local typed parse failure — recognised as a build-phase
# diagnosis and folded into the report (AC-4 spirit).
self._logger.error(
"remote C10 stdout did not produce a parseable BuildReport",
extra={
"kind": _LOG_BUILD_FAILED,
"kv": {"exception_type": type(exc).__name__},
},
)
return CacheBuildReport(
outcome=BuildCacheOutcome.FAILURE,
failure_phase=FailurePhase.BUILD,
flight_resolve_report=flight_resolve_report,
download_report=download_report, # type: ignore[arg-type]
build_report=None,
failure_reason=str(exc),
wall_clock_s=self._elapsed_s(start_ns),
)
except Exception as exc:
if not _is_recognised(exc, _BUILD_RECOGNISED_NAMES):
# Unknown — propagate so the lockfile is released and
# the operator sees the original traceback (AC-6).
raise
self._logger.error(
"remote C10 invocation raised a recognised exception",
extra={
"kind": _LOG_BUILD_FAILED,
"kv": {"exception_type": type(exc).__name__},
},
)
return CacheBuildReport(
outcome=BuildCacheOutcome.FAILURE,
failure_phase=FailurePhase.BUILD,
flight_resolve_report=flight_resolve_report,
download_report=download_report, # type: ignore[arg-type]
build_report=None,
failure_reason=str(exc),
wall_clock_s=self._elapsed_s(start_ns),
)
finally:
try:
session.close()
except Exception:
self._logger.warning(
"ssh session close raised; proceeding",
extra={"kind": _LOG_BUILD_FAILED, "kv": {"phase": "session_close"}},
)
assert build_report is not None, (
"BuildCacheOrchestrator: invoke() returned without setting build_report; "
"early-return paths should have handled all error cases"
)
if build_report.outcome is RemoteBuildOutcome.IDEMPOTENT_NO_OP:
self._logger.info(
"build phase reported IDEMPOTENT_NO_OP (D-C10-1)",
extra={
"kind": _LOG_BUILD_CACHE_IDEMPOTENT,
"kv": {"manifest_hash": build_report.manifest_hash},
},
)
return CacheBuildReport(
outcome=BuildCacheOutcome.IDEMPOTENT_NO_OP,
failure_phase=FailurePhase.NONE,
flight_resolve_report=flight_resolve_report,
download_report=download_report, # type: ignore[arg-type]
build_report=build_report,
failure_reason=None,
wall_clock_s=self._elapsed_s(start_ns),
)
if build_report.outcome is RemoteBuildOutcome.FAILURE:
self._logger.error(
"build phase reported FAILURE",
extra={
"kind": _LOG_BUILD_FAILED,
"kv": {"failure_reason": build_report.failure_reason},
},
)
return CacheBuildReport(
outcome=BuildCacheOutcome.FAILURE,
failure_phase=FailurePhase.BUILD,
flight_resolve_report=flight_resolve_report,
download_report=download_report, # type: ignore[arg-type]
build_report=build_report,
failure_reason=build_report.failure_reason or "build outcome=failure",
wall_clock_s=self._elapsed_s(start_ns),
)
elapsed_s = self._elapsed_s(start_ns)
self._logger.info(
"build-cache pipeline completed successfully",
extra={
"kind": _LOG_BUILD_CACHE_SUCCESS,
"kv": {
"tiles_downloaded": getattr(download_report, "tiles_downloaded", None),
"engines_built": build_report.engines_built,
"engines_reused": build_report.engines_reused,
"descriptors_generated": build_report.descriptors_generated,
"wall_clock_s": elapsed_s,
},
},
)
return CacheBuildReport(
outcome=BuildCacheOutcome.SUCCESS,
failure_phase=FailurePhase.NONE,
flight_resolve_report=flight_resolve_report,
download_report=download_report, # type: ignore[arg-type]
build_report=build_report,
failure_reason=None,
wall_clock_s=elapsed_s,
)
# ----------------------------------------------------------------------
# Helpers — misc
# ----------------------------------------------------------------------
def _elapsed_s(self, start_ns: int) -> float:
return (self._clock.monotonic_ns() - start_ns) / _NS_PER_S
def _collect_secrets(self, request: BuildCacheRequest) -> tuple[str, ...]:
secrets: list[str] = []
if request.api_key:
secrets.append(request.api_key)
if self._config.flights_api_auth_token:
secrets.append(self._config.flights_api_auth_token)
return tuple(secrets)
# ---------------------------------------------------------------------------
# Module-level helpers
# ---------------------------------------------------------------------------
def _is_recognised(exc: BaseException, names: frozenset[str]) -> bool:
"""Return ``True`` iff any class in ``exc``'s MRO has a name in ``names``."""
return any(cls.__name__ in names for cls in type(exc).__mro__)
def _flight_source_kind(source: object) -> str:
if isinstance(source, FlightById):
return "flight_by_id"
if isinstance(source, FlightFromFile):
return "flight_from_file"
return "unknown"
def _bbox_kv(bbox: object) -> dict[str, float]:
return {
"min_lat_deg": getattr(bbox, "min_lat_deg", float("nan")),
"min_lon_deg": getattr(bbox, "min_lon_deg", float("nan")),
"max_lat_deg": getattr(bbox, "max_lat_deg", float("nan")),
"max_lon_deg": getattr(bbox, "max_lon_deg", float("nan")),
}
def _failure_reason_for_flight_resolve(exc: BaseException, request: BuildCacheRequest) -> str:
if isinstance(exc, EmptyWaypointsError):
return "empty waypoints; re-plan in Mission Planner UI"
if isinstance(exc, FlightNotFoundError):
source = request.flight_source
if isinstance(source, FlightById):
return f"flight not found: {source.flight_id}"
return "flight not found"
return f"{type(exc).__name__}: {exc!s}"
@@ -38,12 +38,22 @@ from uuid import UUID
import click
from gps_denied_onboard.components.c12_operator_tooling._types import (
BuildCacheOutcome,
BuildCacheRequest,
CacheBuildReport,
CompanionAddress,
FailurePhase,
FlightById,
FlightFromFile,
FlightSource,
SectorClassification,
)
from gps_denied_onboard.components.c12_operator_tooling.config import (
C12Config,
)
from gps_denied_onboard.components.c12_operator_tooling.errors import (
BuildLockHeldError,
CacheBuildError,
CompanionUnreachableError,
ContentHashMismatchError,
)
@@ -77,12 +87,6 @@ from gps_denied_onboard.components.c12_operator_tooling.flights_api.errors impor
FlightsApiUnreachableError,
WaypointSchemaError,
)
from gps_denied_onboard.components.c12_operator_tooling.flights_api.interface import (
FlightDto,
)
from gps_denied_onboard.components.c12_operator_tooling.freshness_table import (
freshness_threshold_months,
)
from gps_denied_onboard.components.c12_operator_tooling.sector_classification_store import (
SectorClassificationStore,
)
@@ -364,6 +368,25 @@ def download(ctx: click.Context, area: str, bbox: str) -> None:
required=True,
help="Path to the camera calibration JSON to upload alongside the cache.",
)
@click.option(
"--companion-host",
type=str,
required=True,
help="Companion hostname or IP for the SSH-driven C10 build phase.",
)
@click.option("--companion-port", type=int, default=22, help="Companion SSH port (default 22).")
@click.option(
"--satellite-provider-url",
type=str,
required=True,
help="The C11 satellite-provider URL the download phase fetches tiles from.",
)
@click.option(
"--api-key",
type=str,
required=True,
help="C11 satellite-provider API key (NEVER logged; AC-9 redaction guarantee).",
)
@click.pass_context
def build_cache(
ctx: click.Context,
@@ -371,9 +394,14 @@ def build_cache(
flight_file: Path | None,
sector_class: str,
calibration_path: Path,
companion_host: str,
companion_port: int,
satellite_provider_url: str,
api_key: str,
) -> None:
"""Orchestrate the F1 cache build (sibling AZ-328)."""
state = ctx.obj
config: C12Config = state["config"]
logger = state["logger"]
_emit_invoked(
logger,
@@ -382,6 +410,10 @@ def build_cache(
"flight_id": flight_id,
"flight_file": str(flight_file) if flight_file else None,
"sector_class": sector_class,
"companion_host": companion_host,
"companion_port": companion_port,
"satellite_provider_url": satellite_provider_url,
"api_key": "REDACTED",
},
)
@@ -401,46 +433,81 @@ def build_cache(
)
services = state.get("services")
if services is None or not hasattr(services, "flights_api_client"):
if services is None or not hasattr(services, "build_cache_orchestrator"):
_emit_ok(
logger,
"build-cache",
{"note": "no flights_api_client wired (composition-root pending)"},
{"note": "no build_cache_orchestrator wired (composition-root pending)"},
)
ctx.exit(EXIT_OK)
sector_class_enum = SectorClassification(sector_class.lower())
months = freshness_threshold_months(sector_class_enum)
flight_source: FlightSource
if flight_id is not None:
flight_source = FlightById(flight_id=UUID(flight_id))
else:
assert flight_file is not None # gated by the mutually-exclusive check
flight_source = FlightFromFile(path=flight_file)
request = BuildCacheRequest(
flight_source=flight_source,
sector_class=sector_class_enum,
calibration_path=calibration_path,
satellite_provider_url=satellite_provider_url,
api_key=api_key,
companion_address=CompanionAddress(host=companion_host, port=companion_port),
expected_engines=config.companion.expected_engines,
cache_root=config.build_cache.cache_staging_root,
zoom_levels=config.build_cache.zoom_levels,
)
try:
flight = _resolve_flight(services, flight_id=flight_id, flight_file=flight_file)
orchestrator = services.build_cache_orchestrator
orchestrator.build_cache(
flight=flight,
sector_class=sector_class_enum,
freshness_months=months,
calibration_path=calibration_path,
report: CacheBuildReport = services.build_cache_orchestrator.build_cache(request)
except BuildLockHeldError as exc:
_emit_error(
logger,
"build-cache",
exit_code=EXIT_LOCK_HELD,
exception=exc,
remediation=exc.remediation,
kv={"lock_path": str(exc.lock_path)},
)
click.echo(f"build-cache lock held: {exc.remediation}", err=True)
ctx.exit(EXIT_LOCK_HELD)
except CacheBuildError as exc:
_emit_error(
logger,
"build-cache",
exit_code=EXIT_BUILD_FAILURE,
exception=exc,
remediation=exc.remediation,
kv={"failure_phase": exc.failure_phase.value},
)
click.echo(f"cache build failed: {exc.remediation}", err=True)
ctx.exit(EXIT_BUILD_FAILURE)
except Exception as exc:
_handle_known_exception(
ctx,
logger,
"build-cache",
exc,
extra_table={
"BuildLockHeldError": (
EXIT_LOCK_HELD,
"Another build-cache run holds the lock; wait for it to finish.",
),
"CacheBuildError": (
EXIT_BUILD_FAILURE,
"Cache build failed; consult the orchestrator's structured log.",
),
},
)
return
_emit_ok(logger, "build-cache", {"flight_id": str(flight.flight_id)})
ctx.exit(EXIT_OK)
exit_code = _exit_code_for_report(report)
_emit_ok(
logger,
"build-cache",
{
"outcome": report.outcome.value,
"failure_phase": report.failure_phase.value,
"wall_clock_s": report.wall_clock_s,
"exit_code": exit_code,
},
)
if exit_code != EXIT_OK and report.failure_reason:
click.echo(f"cache build failed: {report.failure_reason}", err=True)
ctx.exit(exit_code)
@app.command(
@@ -679,23 +746,37 @@ def _handle_known_exception(
raise AssertionError("unreachable") # pragma: no cover
def _resolve_flight(
services: Any,
*,
flight_id: str | None,
flight_file: Path | None,
) -> FlightDto:
"""Resolve the operator's flight via the flights API or the offline file."""
client = services.flights_api_client
if flight_id is not None:
flight_uuid = UUID(flight_id)
return client.fetch_flight(
flight_id=flight_uuid,
base_url=getattr(services, "flights_api_base_url", ""),
auth_token=getattr(services, "flights_api_auth_token", ""),
_FLIGHT_RESOLVE_EXCEPTION_EXIT_CODES: dict[str, int] = {
"FlightsApiUnreachableError": EXIT_FLIGHTS_API_UNREACHABLE,
"FlightsApiAuthError": EXIT_FLIGHTS_API_AUTH,
"FlightNotFoundError": EXIT_FLIGHT_NOT_FOUND,
"FlightsApiSchemaError": EXIT_FLIGHT_SCHEMA,
"WaypointSchemaError": EXIT_FLIGHT_SCHEMA,
"FlightFileNotFoundError": EXIT_FLIGHT_SCHEMA,
"EmptyWaypointsError": EXIT_EMPTY_WAYPOINTS,
}
def _exit_code_for_report(report: CacheBuildReport) -> int:
"""Map a returned :class:`CacheBuildReport` to the documented exit code.
Success and idempotent-no-op both exit ``0`` (AC-7). For
``failure_phase=flight_resolve`` we route by the captured
``failure_exception_type`` so the granular AZ-489 exit codes are
preserved. Other phases collapse to the per-phase code documented
in :mod:`exit_codes`.
"""
if report.outcome in (BuildCacheOutcome.SUCCESS, BuildCacheOutcome.IDEMPOTENT_NO_OP):
return EXIT_OK
if report.failure_phase is FailurePhase.FLIGHT_RESOLVE:
return _FLIGHT_RESOLVE_EXCEPTION_EXIT_CODES.get(
report.failure_exception_type or "", EXIT_DOWNLOAD_FAILURE
)
assert flight_file is not None # narrowed by the mutually-exclusive gate
return client.load_flight_file(path=flight_file)
if report.failure_phase is FailurePhase.DOWNLOAD:
return EXIT_DOWNLOAD_FAILURE
if report.failure_phase is FailurePhase.BUILD:
return EXIT_BUILD_FAILURE
return EXIT_BUILD_FAILURE
# ---------------------------------------------------------------------------
@@ -27,6 +27,7 @@ from pathlib import Path, PurePosixPath
from gps_denied_onboard.config.schema import ConfigError
__all__ = [
"C12BuildCacheConfig",
"C12CompanionConfig",
"C12Config",
"HostKeyPolicy",
@@ -49,8 +50,12 @@ class HostKeyPolicy(str, Enum):
_DEFAULT_LOG_PATH = Path("~/.azaion/onboard/c12-tooling.log").expanduser()
_DEFAULT_SECTOR_STORE_PATH = Path("~/.azaion/onboard/sector-classifications.json").expanduser()
_DEFAULT_COMPANION_CACHE_ROOT = PurePosixPath("/var/lib/azaion/c10/cache")
_DEFAULT_CACHE_STAGING_ROOT = Path("~/.azaion/onboard/cache-staging").expanduser()
_DEFAULT_CONNECT_TIMEOUT_S = 10.0
_DEFAULT_SHA256SUM_TIMEOUT_S = 60.0
_DEFAULT_LOCK_TIMEOUT_S = 5.0
_DEFAULT_FLIGHT_BBOX_BUFFER_M = 1000.0
_DEFAULT_SSH_CONNECT_TIMEOUT_S = 30.0
@dataclass(frozen=True)
@@ -91,6 +96,68 @@ class C12CompanionConfig:
)
@dataclass(frozen=True)
class C12BuildCacheConfig:
"""Knobs consumed by :class:`BuildCacheOrchestrator` (AZ-328).
* ``cache_staging_root`` — workstation-side directory holding the
lockfile AND the C11 download journal/store; the orchestrator
``mkdir -p`` it on first use.
* ``lock_filename`` / ``lock_timeout_s`` — controls the cross-process
mutex per AZ-328 AC-5. Production uses ``filelock`` (CP-INV-4
parity with c10).
* ``companion_cache_root`` — POSIX path on the airborne companion
under which C10 builds the engines + descriptors + Manifest.
Forwarded to the remote C10 invoker.
* ``flight_bbox_buffer_m`` — horizontal-distance buffer applied to
the bbox derived from the resolved flight waypoints (FAC-INV-3).
* ``flights_api_base_url`` / ``flights_api_auth_token`` — the
operator's credentials for the parent-suite flights service. The
auth token MUST NOT be logged (AC-15); the orchestrator passes it
to ``flights_api_client.fetch_flight`` and otherwise treats it as
opaque.
* ``zoom_levels`` — slippy-map zoom levels to download per request;
defaults to a single zoom 18 grid which matches AC-NEW-1 imagery
resolution. Override per request via ``BuildCacheRequest``.
"""
cache_staging_root: Path = _DEFAULT_CACHE_STAGING_ROOT
lock_filename: str = ".c12.lock"
lock_timeout_s: float = _DEFAULT_LOCK_TIMEOUT_S
ssh_connect_timeout_s: float = _DEFAULT_SSH_CONNECT_TIMEOUT_S
companion_cache_root: PurePosixPath = _DEFAULT_COMPANION_CACHE_ROOT
flight_bbox_buffer_m: float = _DEFAULT_FLIGHT_BBOX_BUFFER_M
flights_api_base_url: str = ""
flights_api_auth_token: str = ""
zoom_levels: tuple[int, ...] = (18,)
def __post_init__(self) -> None:
if self.lock_timeout_s <= 0:
raise ConfigError(
f"C12BuildCacheConfig.lock_timeout_s must be > 0; got {self.lock_timeout_s}"
)
if self.ssh_connect_timeout_s <= 0:
raise ConfigError(
"C12BuildCacheConfig.ssh_connect_timeout_s must be > 0; "
f"got {self.ssh_connect_timeout_s}"
)
if self.flight_bbox_buffer_m < 0:
raise ConfigError(
"C12BuildCacheConfig.flight_bbox_buffer_m must be >= 0; "
f"got {self.flight_bbox_buffer_m}"
)
if not self.zoom_levels:
raise ConfigError(
"C12BuildCacheConfig.zoom_levels must contain at least one zoom level"
)
if any(z < 0 or z > 22 for z in self.zoom_levels):
raise ConfigError(
f"C12BuildCacheConfig.zoom_levels values must be in [0, 22]; got {self.zoom_levels}"
)
if not self.lock_filename:
raise ConfigError("C12BuildCacheConfig.lock_filename must be non-empty")
@dataclass(frozen=True)
class C12Config:
"""Per-component config for C12 operator tooling.
@@ -103,11 +170,14 @@ class C12Config:
:class:`SectorClassificationStore`. Defaults to
``~/.azaion/onboard/sector-classifications.json``.
* ``companion`` — nested AZ-327 SSH config block.
* ``build_cache`` — nested AZ-328 orchestrator knobs (lockfile,
flights service URL/token, bbox buffer).
"""
log_path: Path = _DEFAULT_LOG_PATH
sector_classification_store_path: Path = _DEFAULT_SECTOR_STORE_PATH
companion: C12CompanionConfig = field(default_factory=C12CompanionConfig)
build_cache: C12BuildCacheConfig = field(default_factory=C12BuildCacheConfig)
def __post_init__(self) -> None:
if not isinstance(self.companion, C12CompanionConfig):
@@ -115,3 +185,8 @@ class C12Config:
"C12Config.companion must be a C12CompanionConfig; got "
f"{type(self.companion).__name__}"
)
if not isinstance(self.build_cache, C12BuildCacheConfig):
raise ConfigError(
"C12Config.build_cache must be a C12BuildCacheConfig; got "
f"{type(self.build_cache).__name__}"
)
@@ -1,4 +1,4 @@
"""C12 ``CompanionBringup`` error hierarchy (AZ-327).
"""C12 ``CompanionBringup`` error hierarchy (AZ-327, AZ-328).
Two failure modes own dedicated exit codes in
:mod:`gps_denied_onboard.components.c12_operator_tooling.exit_codes`:
@@ -12,7 +12,19 @@ Two failure modes own dedicated exit codes in
"engine missing" (which is a not-ready signal returned in the
:class:`ReadinessReport`, not an exception).
Both errors expose a ``remediation`` property the
AZ-328 adds the ``BuildCacheOrchestrator`` family:
* :class:`CacheBuildError` — generic wrap of any download / verify-ready
/ build phase failure. Carries a ``failure_phase`` field and a
pre-baked ``remediation`` hint so the CLI can route by ``$?`` AND the
operator gets actionable text.
* :class:`BuildLockHeldError` — concurrent ``build-cache`` invocation
blocked by the workstation lockfile. Subclass of
:class:`CacheBuildError` so the CLI's exception table catches both.
* :class:`BuildReportParseError` — C10's stdout did not yield a parseable
``BuildReport`` JSON document; surfaced as ``failure_phase=build``.
All errors expose a ``remediation`` property the
:func:`gps_denied_onboard.components.c12_operator_tooling.cli.main`
layer reads to print a one-line operator-friendly hint to stderr.
@@ -25,11 +37,17 @@ discipline by keeping the hint table in c12.
from __future__ import annotations
from pathlib import Path
from gps_denied_onboard.components.c12_operator_tooling._types import (
CompanionUnreachableReason,
FailurePhase,
)
__all__ = [
"BuildLockHeldError",
"BuildReportParseError",
"CacheBuildError",
"CompanionUnreachableError",
"ContentHashMismatchError",
]
@@ -125,3 +143,126 @@ class ContentHashMismatchError(Exception):
"Re-run the cache build (`operator-tool build-cache --flight-id ...`) "
"to repopulate the affected engine."
)
# ---------------------------------------------------------------------------
# AZ-328: BuildCacheOrchestrator error family
# ---------------------------------------------------------------------------
_REMEDIATION_FLIGHT_RESOLVE: str = (
"Verify --flight-id (or --flight-file path), the flights service URL in config, "
"and the operator auth token. For a 404 the GUID is wrong; for an empty waypoint "
"list re-plan in the Mission Planner UI."
)
_REMEDIATION_DOWNLOAD: str = (
"Re-run with the same args; check `satellite_provider_url` and `api_key` "
"in the c11 config, and confirm the workstation has internet egress."
)
_REMEDIATION_BUILD: str = (
"Inspect the companion `~/.azaion/onboard/c10-build.log`; consider "
"`rm -rf <companion_cache_root>/engines/` to force a clean rebuild on the "
"next run, then re-issue the same `build-cache` command."
)
_REMEDIATION_NONE_FALLBACK: str = (
"No remediation hint registered for this failure phase; consult the structured "
"log for the wrapped exception details."
)
_REMEDIATIONS: dict[FailurePhase, str] = {
FailurePhase.FLIGHT_RESOLVE: _REMEDIATION_FLIGHT_RESOLVE,
FailurePhase.DOWNLOAD: _REMEDIATION_DOWNLOAD,
FailurePhase.BUILD: _REMEDIATION_BUILD,
}
class CacheBuildError(Exception):
"""Wrap any underlying C11 / C10 / SSH / parse failure with phase + remediation.
The orchestrator constructs this around every typed error its
collaborators raise so the CLI can route on ``failure_phase`` →
exit code without importing the upstream component's exception
hierarchy (AZ-507 cross-component rule).
"""
def __init__(
self,
*,
failure_phase: FailurePhase,
wrapped_exception_repr: str,
message: str | None = None,
remediation: str | None = None,
) -> None:
if failure_phase is FailurePhase.NONE:
raise ValueError(
"CacheBuildError must be raised for a real failure phase; "
"FailurePhase.NONE is reserved for successful CacheBuildReport."
)
rendered = message or (
f"cache build failed in phase {failure_phase.value}: {wrapped_exception_repr}"
)
super().__init__(rendered)
self.failure_phase = failure_phase
self.wrapped_exception_repr = wrapped_exception_repr
self._remediation = remediation or _REMEDIATIONS.get(
failure_phase, _REMEDIATION_NONE_FALLBACK
)
@property
def remediation(self) -> str:
"""Operator-friendly one-line hint, varies by ``failure_phase``."""
return self._remediation
class BuildLockHeldError(CacheBuildError):
"""Another ``build-cache`` invocation holds the workstation lockfile.
Subclass of :class:`CacheBuildError` so the CLI's existing exception
table picks it up; carries a phase-overriding remediation that
points at the lock path so the operator can recover deterministically.
"""
def __init__(self, *, lock_path: Path, timeout_s: float) -> None:
super().__init__(
failure_phase=FailurePhase.DOWNLOAD,
wrapped_exception_repr=f"LockTimeout(path={lock_path!s}, timeout_s={timeout_s})",
message=(
f"build-cache lock held: another `operator-tool build-cache` is in "
f"progress (lock={lock_path}, waited {timeout_s:.1f} s)"
),
remediation=(
f"Another `build-cache` is in progress; wait for it to finish, or "
f"kill the holding process and remove `{lock_path}` if it is stale."
),
)
self.lock_path = lock_path
self.timeout_s = timeout_s
class BuildReportParseError(CacheBuildError):
"""C10's companion-side stdout did not contain a parseable BuildReport JSON.
The C10 process likely crashed mid-output or printed garbage; the
operator needs to inspect the captured tail and the companion's own
log file. ``failure_phase=build`` per AZ-328 Risk 3 mitigation.
"""
def __init__(self, *, stdout_tail: str, stderr_tail: str) -> None:
super().__init__(
failure_phase=FailurePhase.BUILD,
wrapped_exception_repr=(
f"BuildReportParseError(stdout_tail={stdout_tail[:200]!r}, "
f"stderr_tail={stderr_tail[:200]!r})"
),
message=(
"remote C10 build did not emit a parseable BuildReport JSON line; "
"the process likely crashed or printed garbage"
),
remediation=(
"Inspect the companion `~/.azaion/onboard/c10-build.log` for the "
"underlying crash; the captured stdout/stderr tail is on the error "
"object's `wrapped_exception_repr`."
),
)
self.stdout_tail = stdout_tail
self.stderr_tail = stderr_tail
@@ -0,0 +1,109 @@
"""Workstation-side file-lock protocols + ``filelock``-backed concrete (AZ-328).
The C12 ``BuildCacheOrchestrator`` acquires ``cache_staging_root/.c12.lock``
to serialise concurrent operator runs of ``operator-tool build-cache``
(description.md § 7). C10's own lockfile lives on the companion under
``companion_cache_root/.c10.lock`` (CP-INV-4) — these are independent;
the workstation lock prevents two workstation processes from racing on
the C6 cache root, the companion lock prevents two companion processes
from racing on the engines+manifest root.
Why a separate factory rather than reusing c10's: the AZ-507 cross-
component rule forbids importing ``c10_provisioning`` from
``c12_operator_tooling``. Both factories thinly wrap the same
``filelock`` library; the contract Protocol below is the consumer-side
cut for c12.
The Protocol intentionally mirrors c10's shape (``try_lock(path,
*, timeout_s) -> AbstractContextManager[None]``) so a future move to a
shared ``helpers/file_lock.py`` is a one-line API change here.
"""
from __future__ import annotations
from contextlib import AbstractContextManager
from pathlib import Path
from typing import Protocol, runtime_checkable
import filelock
__all__ = [
"FileLock",
"FileLockFactory",
"FilelockFileLockFactory",
"LockTimeout",
]
class LockTimeout(Exception):
"""Raised by :meth:`FileLockFactory.try_lock` on timeout.
Local exception (not ``filelock.Timeout``) so ``BuildCacheOrchestrator``
catches it without importing the third-party ``filelock`` exception
class through the consumer-side cut.
"""
def __init__(self, *, path: Path, timeout_s: float) -> None:
super().__init__(f"failed to acquire lock at {path} within {timeout_s:.1f} s")
self.path = path
self.timeout_s = timeout_s
@runtime_checkable
class FileLock(Protocol):
"""Context-manager handle to an acquired file lock."""
def __enter__(self) -> FileLock: ...
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: object | None,
) -> None: ...
@runtime_checkable
class FileLockFactory(Protocol):
"""Construct a :class:`FileLock` against a filesystem path."""
def try_lock(self, path: Path, *, timeout_s: float) -> AbstractContextManager[None]:
"""Acquire ``path`` within ``timeout_s`` or raise :class:`LockTimeout`."""
...
class FilelockFileLockFactory:
"""Production :class:`FileLockFactory` — wraps the ``filelock`` library.
``filelock.FileLock`` uses ``fcntl.flock`` on POSIX; the OS auto-
releases the lock on process death (kill -9, parent exit), giving us
the AZ-328 AC-6 "lockfile released even on KeyboardInterrupt"
invariant for free.
"""
def try_lock(self, path: Path, *, timeout_s: float) -> AbstractContextManager[None]:
path.parent.mkdir(parents=True, exist_ok=True)
lock = filelock.FileLock(str(path))
try:
lock.acquire(timeout=timeout_s)
except filelock.Timeout as exc:
raise LockTimeout(path=path, timeout_s=timeout_s) from exc
return _AcquiredFileLockHandle(lock)
class _AcquiredFileLockHandle:
"""Internal CM wrapper that releases the underlying ``filelock.FileLock`` on exit."""
def __init__(self, lock: filelock.FileLock) -> None:
self._lock = lock
def __enter__(self) -> None:
return None
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: object | None,
) -> None:
self._lock.release()
@@ -47,7 +47,10 @@ def parse_flight_payload(payload: Any, *, source_label: str) -> FlightDto:
waypoints = tuple(
sorted(
(_parse_waypoint(item, index, source_label) for index, item in enumerate(waypoints_raw)),
(
_parse_waypoint(item, index, source_label)
for index, item in enumerate(waypoints_raw)
),
key=lambda wp: wp.ordinal,
)
)
@@ -80,9 +83,7 @@ def _parse_waypoint(item: Any, source_index: int, source_label: str) -> Waypoint
objective = _parse_enum(
item, "objective", WaypointObjective, f"{source_label} waypoint #{source_index}"
)
source = _parse_enum(
item, "source", WaypointSource, f"{source_label} waypoint #{source_index}"
)
source = _parse_enum(item, "source", WaypointSource, f"{source_label} waypoint #{source_index}")
return WaypointDto(
ordinal=ordinal,
lat_deg=lat_deg,
@@ -93,9 +94,7 @@ def _parse_waypoint(item: Any, source_index: int, source_label: str) -> Waypoint
)
def _enforce_contiguous_ordinals(
waypoints: tuple[WaypointDto, ...], source_label: str
) -> None:
def _enforce_contiguous_ordinals(waypoints: tuple[WaypointDto, ...], source_label: str) -> None:
for expected, wp in enumerate(waypoints):
if wp.ordinal != expected:
raise WaypointSchemaError(
@@ -143,8 +142,7 @@ def _require_int(payload: dict[str, Any], field: str, source_label: str) -> int:
value = payload[field]
if isinstance(value, bool) or not isinstance(value, int):
raise WaypointSchemaError(
f"{source_label}: field {field!r} must be an integer; "
f"got {type(value).__name__}"
f"{source_label}: field {field!r} must be an integer; got {type(value).__name__}"
)
return value
@@ -163,9 +161,7 @@ def _require_finite_float(payload: dict[str, Any], field: str, source_label: str
return fvalue
def _parse_enum(
payload: dict[str, Any], field: str, enum_cls: type, source_label: str
) -> Any:
def _parse_enum(payload: dict[str, Any], field: str, enum_cls: type, source_label: str) -> Any:
if field not in payload:
raise WaypointSchemaError(f"{source_label}: missing required field {field!r}")
raw = payload[field]
@@ -62,12 +62,8 @@ def bbox_from_waypoints(
sw_enu = WgsConverter.latlonalt_to_local_enu(origin, sw)
ne_enu = WgsConverter.latlonalt_to_local_enu(origin, ne)
sw_inflated_enu = np.array(
[sw_enu[0] - buffer_m, sw_enu[1] - buffer_m, 0.0], dtype=np.float64
)
ne_inflated_enu = np.array(
[ne_enu[0] + buffer_m, ne_enu[1] + buffer_m, 0.0], dtype=np.float64
)
sw_inflated_enu = np.array([sw_enu[0] - buffer_m, sw_enu[1] - buffer_m, 0.0], dtype=np.float64)
ne_inflated_enu = np.array([ne_enu[0] + buffer_m, ne_enu[1] + buffer_m, 0.0], dtype=np.float64)
sw_inflated = WgsConverter.local_enu_to_latlonalt(origin, sw_inflated_enu)
ne_inflated = WgsConverter.local_enu_to_latlonalt(origin, ne_inflated_enu)
@@ -41,7 +41,5 @@ def load_flight_file(*, path: Path) -> FlightDto:
try:
payload = orjson.loads(raw)
except orjson.JSONDecodeError as exc:
raise FlightsApiSchemaError(
f"flight file {path!s}: not valid JSON: {exc}"
) from exc
raise FlightsApiSchemaError(f"flight file {path!s}: not valid JSON: {exc}") from exc
return parse_flight_payload(payload, source_label=f"flight file {path!s}")
@@ -81,9 +81,7 @@ class HttpxFlightsApiClient:
sleep: Callable[[float], None] | None = None,
) -> None:
self._transport = transport
self._sleep: Callable[[float], None] = (
sleep if sleep is not None else _wall_clock_sleep
)
self._sleep: Callable[[float], None] = sleep if sleep is not None else _wall_clock_sleep
self._log = get_logger("c12.flights_api")
def fetch_flight(
@@ -0,0 +1,239 @@
"""``RemoteCacheProvisionerInvoker`` — SSH-side C10 build entry (AZ-328).
Runs C10's companion-side build entry over the operator's SSH session
and parses the ``BuildReport`` JSON document the C10 process emits as
the LAST line of its stdout. Streams the intermediate stdout lines to
the structured logger at DEBUG level (``kind="c10.remote.progress"``)
so a long-running build is observable from the operator workstation
without buffering hours of output in memory.
AZ-507 cross-component cut: this module does NOT import
``c10_provisioning``. The C10 ``BuildReport`` shape arrives as a JSON
document on the wire and is parsed into the local
:class:`RemoteBuildReport` mirror declared in ``_types.py``.
AZ-328 Risk 5 mitigation: the ``api_key`` and ``flights_api_auth_token``
secrets passed by the orchestrator are surfaced through a redactor that
replaces the literal token in any forwarded stdout line with
``<REDACTED>`` — defence-in-depth in case the companion's C10 process
echoes something it should not.
"""
from __future__ import annotations
import json
import logging
import shlex
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path, PurePosixPath
from uuid import UUID
from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt
from gps_denied_onboard.components.c12_operator_tooling._types import (
RemoteBuildOutcome,
RemoteBuildReport,
SectorClassification,
)
from gps_denied_onboard.components.c12_operator_tooling.errors import (
BuildReportParseError,
)
from gps_denied_onboard.components.c12_operator_tooling.ssh_session import (
SshSession,
)
__all__ = [
"REDACTED_PLACEHOLDER",
"RemoteBuildRequest",
"RemoteCacheProvisionerInvoker",
"build_remote_command",
]
REDACTED_PLACEHOLDER: str = "<REDACTED>"
_LOG_KIND_PROGRESS = "c10.remote.progress"
_LOG_KIND_INVOKE_START = "c12.remote_c10.invoke.start"
_LOG_KIND_INVOKE_OK = "c12.remote_c10.invoke.ok"
_LOG_KIND_INVOKE_FAILED = "c12.remote_c10.invoke.failed"
# Companion-side entry shipped by AZ-325 (E-BOOT scaffolding). The C10
# build mode is an azaion-onboard subcommand; the request is fed via
# stdin as JSON so we do not have to escape the bbox/zoom/sector args
# through the shell quoting layer twice.
_REMOTE_ENTRY: str = "azaion-onboard c10 build --json-output --request-stdin"
@dataclass(frozen=True, slots=True)
class RemoteBuildRequest:
"""Shape forwarded to C10's companion-side build entry as a JSON document.
``takeoff_origin`` + ``flight_id`` are the ADR-010 pass-through
fields C10 / AZ-323 bake into the Manifest so a re-planned flight
produces a fresh cache identity (CP-INV-8 on the C10 side).
"""
bbox: BoundingBox
zoom_levels: tuple[int, ...]
sector_class: SectorClassification
calibration_path: Path
expected_engines: tuple[str, ...]
companion_cache_root: PurePosixPath
takeoff_origin: LatLonAlt
flight_id: UUID
def build_remote_command() -> str:
"""Return the shell command run on the companion (kept module-level for testability)."""
return _REMOTE_ENTRY
def _serialise_request(request: RemoteBuildRequest) -> str:
"""Render :class:`RemoteBuildRequest` to a single-line JSON document."""
payload = {
"bbox": {
"min_lat_deg": request.bbox.min_lat_deg,
"min_lon_deg": request.bbox.min_lon_deg,
"max_lat_deg": request.bbox.max_lat_deg,
"max_lon_deg": request.bbox.max_lon_deg,
},
"zoom_levels": list(request.zoom_levels),
"sector_class": request.sector_class.value,
"calibration_path": str(request.calibration_path),
"expected_engines": list(request.expected_engines),
"companion_cache_root": str(request.companion_cache_root),
"takeoff_origin": {
"lat_deg": request.takeoff_origin.lat_deg,
"lon_deg": request.takeoff_origin.lon_deg,
"alt_m": request.takeoff_origin.alt_m,
},
"flight_id": str(request.flight_id),
}
return json.dumps(payload, separators=(",", ":"))
def _redact(line: str, secrets: Sequence[str]) -> str:
redacted = line
for secret in secrets:
if secret:
redacted = redacted.replace(secret, REDACTED_PLACEHOLDER)
return redacted
def _parse_build_report(json_line: str) -> RemoteBuildReport:
payload = json.loads(json_line)
outcome_raw = payload.get("outcome")
if outcome_raw not in {o.value for o in RemoteBuildOutcome}:
raise ValueError(f"BuildReport.outcome={outcome_raw!r} is not a known RemoteBuildOutcome")
return RemoteBuildReport(
outcome=RemoteBuildOutcome(outcome_raw),
engines_built=int(payload.get("engines_built", 0)),
engines_reused=int(payload.get("engines_reused", 0)),
descriptors_generated=int(payload.get("descriptors_generated", 0)),
manifest_hash=payload.get("manifest_hash"),
failure_reason=payload.get("failure_reason"),
elapsed_s=float(payload.get("elapsed_s", 0.0)),
)
class RemoteCacheProvisionerInvoker:
"""Run C10's companion-side build entry over an open SSH session.
Stateless; one invocation per :meth:`invoke` call. The orchestrator
holds a single instance per CLI invocation so the redactor's secrets
list does not have to leak through the interface.
"""
def __init__(self, *, logger: logging.Logger, command_timeout_s: float = 7200.0) -> None:
self._logger = logger
self._command_timeout_s = command_timeout_s
def invoke(
self,
session: SshSession,
request: RemoteBuildRequest,
*,
secrets_to_redact: Sequence[str] = (),
) -> RemoteBuildReport:
"""Run C10's build entry on ``session`` and return the parsed report.
``secrets_to_redact`` is the list of operator-supplied bearer
tokens / api_keys the caller wants stripped from any forwarded
DEBUG line — defence-in-depth against a misbehaving C10 echoing
them. Empty by default; the orchestrator always passes the
request's ``api_key`` and the configured flights auth token in.
"""
request_json = _serialise_request(request)
self._logger.info(
"starting remote C10 build invocation",
extra={
"kind": _LOG_KIND_INVOKE_START,
"kv": {
"flight_id": str(request.flight_id),
"sector_class": request.sector_class.value,
"zoom_levels": list(request.zoom_levels),
},
},
)
# The companion entry reads the request from stdin so we avoid
# secondary shell quoting of the JSON payload through ``ssh``.
# paramiko's ``run`` Protocol method does NOT take stdin, so we
# echo the JSON into the entry via a here-doc-equivalent shell
# construct: ``printf '%s' '<JSON>' | <entry>`` with single-quote
# escaping handled by ``shlex.quote``.
wrapped_command = f"printf %s {shlex.quote(request_json)} | {build_remote_command()}"
try:
result = session.run(wrapped_command, timeout_s=self._command_timeout_s)
except Exception as exc:
self._logger.error(
"remote C10 invocation failed at the SSH layer",
extra={
"kind": _LOG_KIND_INVOKE_FAILED,
"kv": {"exception_type": type(exc).__name__},
},
)
raise
# Stream forwarded stdout lines as DEBUG with redaction. We
# deliberately walk the lines top-to-bottom so the LAST line is
# the one we attempt to parse as the BuildReport JSON document
# (description.md § 8 + AZ-328 spec § Outcome).
stdout_lines = result.stdout.splitlines()
secrets = list(secrets_to_redact)
for line in stdout_lines[:-1] if stdout_lines else ():
self._logger.debug(
"remote c10 progress",
extra={
"kind": _LOG_KIND_PROGRESS,
"kv": {"line": _redact(line, secrets)},
},
)
if result.exit_code != 0 or not stdout_lines:
raise BuildReportParseError(
stdout_tail=_redact(result.stdout[-2048:], secrets),
stderr_tail=_redact(result.stderr[-2048:], secrets),
)
last_line = stdout_lines[-1].strip()
try:
report = _parse_build_report(last_line)
except (json.JSONDecodeError, ValueError) as exc:
raise BuildReportParseError(
stdout_tail=_redact(result.stdout[-2048:], secrets),
stderr_tail=_redact(result.stderr[-2048:], secrets),
) from exc
self._logger.info(
"remote C10 build invocation returned a parseable BuildReport",
extra={
"kind": _LOG_KIND_INVOKE_OK,
"kv": {
"outcome": report.outcome.value,
"engines_built": report.engines_built,
"engines_reused": report.engines_reused,
"descriptors_generated": report.descriptors_generated,
"manifest_hash": report.manifest_hash,
},
},
)
return report
@@ -0,0 +1,38 @@
"""C12 consumer-side structural cut of c11 ``TileDownloader`` (AZ-507).
The AZ-507 cross-component rule (see ``_docs/02_document/module-layout.md``
line 252) forbids ``c12_operator_tooling/*.py`` from importing
``components.c11_tile_manager`` directly. The ``BuildCacheOrchestrator``
needs the download surface to drive the F1 download phase, so we
declare a local Protocol that mirrors the shape of c11's
:class:`gps_denied_onboard.components.c11_tile_manager.interface.TileDownloader`
``download_tiles_for_area`` method.
The composition root (``runtime_root.c12_factory``) wires the concrete
c11 strategy in. Tests inject a fake that returns a
:class:`DownloadBatchReportCut` directly, so they never touch c11 either.
"""
from __future__ import annotations
from typing import Protocol, runtime_checkable
from gps_denied_onboard.components.c12_operator_tooling._types import (
DownloadBatchReportCut,
DownloadRequestCut,
)
__all__ = ["TileDownloaderCut"]
@runtime_checkable
class TileDownloaderCut(Protocol):
"""Single-method consumer-side cut of c11 ``TileDownloader``.
The orchestrator constructs a :class:`DownloadRequestCut` and the
composition-root wiring in ``c12_factory`` translates it into c11's
real ``DownloadRequest`` (and the returned ``DownloadBatchReport``
back into a :class:`DownloadBatchReportCut`).
"""
def download_tiles_for_area(self, request: DownloadRequestCut) -> DownloadBatchReportCut: ...
@@ -6,13 +6,19 @@
classification map.
* :func:`build_companion_bringup` — AZ-327 SSH-based pre-flight
verification of the companion's four artifacts.
* :func:`build_build_cache_orchestrator` — AZ-328 F1 cache-build
orchestrator. Wires the ``filelock`` factory + the remote C10 invoker
+ the c11 ``TileDownloader`` adapter on top of the existing AZ-326 /
AZ-327 / AZ-489 services. The AZ-507 cross-component cut means we
translate c11's real ``DownloadRequest`` / ``DownloadBatchReport`` to
the local ``DownloadRequestCut`` / ``DownloadBatchReportCut`` here.
* :func:`build_operator_tool` — aggregator that returns the
:class:`OperatorToolServices` dataclass the AZ-326 CLI consumes.
Each ``build_*`` function is intentionally tiny — there is one
production strategy per service today and the CLI wiring just plugs
the concrete instance into the same composition root method. Sibling
tasks AZ-328 / AZ-329 / AZ-330 will each add a single field to
tasks AZ-329 / AZ-330 will each add a single field to
:class:`OperatorToolServices` without renaming or moving the
dataclass.
"""
@@ -23,9 +29,16 @@ import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.components.c12_operator_tooling.build_cache import (
BuildCacheOrchestrator,
)
from gps_denied_onboard.components.c12_operator_tooling.companion_bringup import (
CompanionBringup,
)
from gps_denied_onboard.components.c12_operator_tooling.file_lock import (
FilelockFileLockFactory,
)
from gps_denied_onboard.components.c12_operator_tooling.flights_api import (
FlightsApiClient,
HttpxFlightsApiClient,
@@ -33,12 +46,18 @@ from gps_denied_onboard.components.c12_operator_tooling.flights_api import (
from gps_denied_onboard.components.c12_operator_tooling.paramiko_ssh_session import (
ParamikoSshSessionFactory,
)
from gps_denied_onboard.components.c12_operator_tooling.remote_c10_invoker import (
RemoteCacheProvisionerInvoker,
)
from gps_denied_onboard.components.c12_operator_tooling.remote_sidecar_verifier import (
RemoteSidecarVerifier,
)
from gps_denied_onboard.components.c12_operator_tooling.sector_classification_store import (
SectorClassificationStore,
)
from gps_denied_onboard.components.c12_operator_tooling.tile_downloader_cut import (
TileDownloaderCut,
)
if TYPE_CHECKING:
from gps_denied_onboard.components.c12_operator_tooling.config import (
@@ -48,6 +67,7 @@ if TYPE_CHECKING:
__all__ = [
"OperatorToolServices",
"build_build_cache_orchestrator",
"build_companion_bringup",
"build_flights_api_client",
"build_operator_tool",
@@ -57,6 +77,8 @@ __all__ = [
_C12_LOGGER_NAME = "c12_operator_tooling"
_COMPANION_LOGGER_NAME = "c12_operator_tooling.companion_bringup"
_BUILD_CACHE_LOGGER_NAME = "c12_operator_tooling.build_cache"
_REMOTE_C10_LOGGER_NAME = "c12_operator_tooling.remote_c10_invoker"
@dataclass(frozen=True)
@@ -65,15 +87,22 @@ class OperatorToolServices:
AZ-326 introduced the dataclass and now owns three services
(``flights_api_client``, ``sector_classification_store``,
``companion_bringup``). Sibling tasks AZ-328 (orchestrator),
AZ-329 (post-landing upload), and AZ-330 (operator reloc service)
extend this dataclass in-place by appending their own service
field — they MUST NOT rename, move, or split it.
``companion_bringup``). AZ-328 added ``build_cache_orchestrator``.
Sibling tasks AZ-329 (post-landing upload) and AZ-330 (operator
reloc service) extend this dataclass in-place by appending their
own service field — they MUST NOT rename, move, or split it.
``build_cache_orchestrator`` is ``None`` when the AZ-328 wiring is
not requested (e.g. unit tests for AZ-326 / AZ-327 that don't go
through the full build path); the CLI's ``build-cache`` subcommand
short-circuits with an EXIT_OK + log when the field is missing /
None so the rest of the CLI keeps working.
"""
flights_api_client: FlightsApiClient
sector_classification_store: SectorClassificationStore
companion_bringup: CompanionBringup
build_cache_orchestrator: BuildCacheOrchestrator | None = None
def build_flights_api_client(config: Config) -> FlightsApiClient:
@@ -130,13 +159,86 @@ def build_companion_bringup(
)
def build_operator_tool(config: Config) -> OperatorToolServices:
"""Aggregate the three AZ-326 / AZ-327 / AZ-489 service handles."""
return OperatorToolServices(
def build_build_cache_orchestrator(
config: Config,
*,
services: OperatorToolServices,
tile_downloader: TileDownloaderCut,
clock: Clock,
logger: logging.Logger | None = None,
) -> BuildCacheOrchestrator:
"""Build the AZ-328 :class:`BuildCacheOrchestrator` from config + sibling services.
Caller (production runtime root) is responsible for translating the
real c11 ``TileDownloader`` to a :class:`TileDownloaderCut` adapter
here — ``c12_operator_tooling`` cannot import c11 directly per
AZ-507. The lockfile factory + remote-C10 invoker + SSH factory are
constructed in-place; the SSH factory MUST be the same instance as
the one wired into ``services.companion_bringup`` (single
composition-root construction per AZ-328 Constraints).
"""
c12_config = _resolve_c12_config(config)
companion = c12_config.companion
if not str(companion.ssh_keyfile):
from gps_denied_onboard.config.schema import ConfigError
raise ConfigError(
"C12CompanionConfig.ssh_keyfile is empty; AZ-328 build_cache_orchestrator "
"requires a real SSH private key path"
)
ssh_factory = ParamikoSshSessionFactory(
ssh_user=companion.ssh_user,
ssh_keyfile=companion.ssh_keyfile,
host_key_policy=companion.host_key_policy,
)
invoker_logger = logger or logging.getLogger(_REMOTE_C10_LOGGER_NAME)
orchestrator_logger = logger or logging.getLogger(_BUILD_CACHE_LOGGER_NAME)
return BuildCacheOrchestrator(
flights_api_client=services.flights_api_client,
tile_downloader=tile_downloader,
companion_bringup=services.companion_bringup,
remote_c10_invoker=RemoteCacheProvisionerInvoker(logger=invoker_logger),
ssh_factory=ssh_factory,
lock_factory=FilelockFileLockFactory(),
logger=orchestrator_logger,
clock=clock,
config=c12_config.build_cache,
)
def build_operator_tool(
config: Config,
*,
tile_downloader: TileDownloaderCut | None = None,
clock: Clock | None = None,
) -> OperatorToolServices:
"""Aggregate the AZ-326 / AZ-327 / AZ-328 / AZ-489 service handles.
``tile_downloader`` and ``clock`` are optional — without them, the
``build_cache_orchestrator`` field is left as ``None`` and the CLI's
``build-cache`` subcommand short-circuits gracefully. Production
wiring (the suite-level runtime root) supplies real instances.
"""
base = OperatorToolServices(
flights_api_client=build_flights_api_client(config),
sector_classification_store=build_sector_classification_store(config),
companion_bringup=build_companion_bringup(config),
)
if tile_downloader is None or clock is None:
return base
orchestrator = build_build_cache_orchestrator(
config,
services=base,
tile_downloader=tile_downloader,
clock=clock,
)
return OperatorToolServices(
flights_api_client=base.flights_api_client,
sector_classification_store=base.sector_classification_store,
companion_bringup=base.companion_bringup,
build_cache_orchestrator=orchestrator,
)
def _resolve_c12_config(config: Config) -> C12Config:
@@ -71,11 +71,13 @@ def _three_waypoint_payload(*, flight_id: UUID = FLIGHT_ID) -> dict[str, object]
"flight_id": str(flight_id),
"name": "derkachi-sweep",
"waypoints": [
_waypoint_payload(ordinal=0, lat_deg=50.0, lon_deg=36.2, alt_m=200.0,
objective="takeoff"),
_waypoint_payload(
ordinal=0, lat_deg=50.0, lon_deg=36.2, alt_m=200.0, objective="takeoff"
),
_waypoint_payload(ordinal=1, lat_deg=50.01, lon_deg=36.22, alt_m=210.0),
_waypoint_payload(ordinal=2, lat_deg=50.02, lon_deg=36.24, alt_m=220.0,
objective="landing"),
_waypoint_payload(
ordinal=2, lat_deg=50.02, lon_deg=36.24, alt_m=220.0, objective="landing"
),
],
}
@@ -130,9 +132,7 @@ def test_ac1_online_happy_path_returns_three_waypoint_flight(
_, buffer = capture_flights_api_logs
# Act
flight = client.fetch_flight(
flight_id=FLIGHT_ID, base_url=BASE_URL, auth_token=AUTH_TOKEN
)
flight = client.fetch_flight(flight_id=FLIGHT_ID, base_url=BASE_URL, auth_token=AUTH_TOKEN)
# Assert
assert isinstance(flight, FlightDto)
@@ -221,9 +221,7 @@ def test_ac4_online_503_then_200_retries_once_and_succeeds(
_, buffer = capture_flights_api_logs
# Act
flight = client.fetch_flight(
flight_id=FLIGHT_ID, base_url=BASE_URL, auth_token=AUTH_TOKEN
)
flight = client.fetch_flight(flight_id=FLIGHT_ID, base_url=BASE_URL, auth_token=AUTH_TOKEN)
# Assert
assert isinstance(flight, FlightDto)
@@ -478,9 +476,7 @@ def test_ac13_online_and_offline_produce_equal_dtos(tmp_path: Path) -> None:
flight_file.write_bytes(json.dumps(payload).encode())
# Act
online_dto = client.fetch_flight(
flight_id=FLIGHT_ID, base_url=BASE_URL, auth_token=AUTH_TOKEN
)
online_dto = client.fetch_flight(flight_id=FLIGHT_ID, base_url=BASE_URL, auth_token=AUTH_TOKEN)
offline_dto = load_flight_file(path=flight_file)
# Assert
@@ -505,9 +501,7 @@ def test_ac14_shuffled_ordinals_are_returned_in_sorted_order() -> None:
client, _ = _make_client_with_handler(handler)
# Act
flight = client.fetch_flight(
flight_id=FLIGHT_ID, base_url=BASE_URL, auth_token=AUTH_TOKEN
)
flight = client.fetch_flight(flight_id=FLIGHT_ID, base_url=BASE_URL, auth_token=AUTH_TOKEN)
# Assert
assert tuple(w.ordinal for w in flight.waypoints) == (0, 1, 2)
@@ -0,0 +1,968 @@
"""AZ-328 — ``BuildCacheOrchestrator`` AC-1 .. AC-15 + NFR-perf-overhead.
Every fake collaborator records call counts so the sequencing and
"never-called" assertions land. The fakes never spawn real network /
SSH activity; the integration paths (paramiko + httpx) are exercised
elsewhere by AZ-489's wire tests + AZ-327's smoke test.
"""
from __future__ import annotations
import logging
import time
from contextlib import AbstractContextManager
from dataclasses import dataclass, field
from pathlib import Path, PurePosixPath
from uuid import UUID
import pytest
from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt
from gps_denied_onboard.components.c12_operator_tooling import (
BuildCacheOrchestrator,
BuildCacheOutcome,
BuildCacheRequest,
BuildLockHeldError,
C12BuildCacheConfig,
C12CompanionConfig,
CacheBuildError,
CompanionAddress,
CompanionBringup,
CompanionUnreachableError,
CompanionUnreachableReason,
ContentHashMismatchError,
DownloadBatchReportCut,
DownloadOutcomeCut,
DownloadRequestCut,
EmptyWaypointsError,
FailurePhase,
FlightById,
FlightFromFile,
FlightNotFoundError,
FlightsApiUnreachableError,
HostKeyPolicy,
ReadinessOutcome,
ReadinessReport,
RemoteBuildOutcome,
RemoteBuildReport,
RemoteCacheProvisionerInvoker,
SectorClassification,
WaypointDto,
WaypointObjective,
WaypointSource,
)
from gps_denied_onboard.components.c12_operator_tooling.file_lock import LockTimeout
from gps_denied_onboard.components.c12_operator_tooling.flights_api.interface import (
FlightDto,
FlightsApiClient,
)
from gps_denied_onboard.components.c12_operator_tooling.remote_c10_invoker import (
RemoteBuildRequest,
)
from gps_denied_onboard.components.c12_operator_tooling.ssh_session import (
RemoteCommandResult,
SshSession,
SshSessionFactory,
)
from gps_denied_onboard.components.c12_operator_tooling.tile_downloader_cut import (
TileDownloaderCut,
)
# ---------------------------------------------------------------------------
# Constants + helpers
# ---------------------------------------------------------------------------
_FLIGHT_ID = UUID("12345678-1234-1234-1234-123456789012")
_API_KEY = "super-secret-api-key"
_AUTH_TOKEN = "bearer-xyz-token"
_SAT_URL = "https://satellite.example.com"
_COMPANION = CompanionAddress(host="companion.local", port=22)
def _flight() -> FlightDto:
return FlightDto(
flight_id=_FLIGHT_ID,
name="happy-path",
waypoints=(
WaypointDto(
ordinal=0,
lat_deg=50.0,
lon_deg=36.2,
alt_m=200.0,
objective=WaypointObjective.TAKEOFF,
source=WaypointSource.OPERATOR,
),
WaypointDto(
ordinal=1,
lat_deg=50.05,
lon_deg=36.25,
alt_m=210.0,
objective=WaypointObjective.WAYPOINT,
source=WaypointSource.OPERATOR,
),
WaypointDto(
ordinal=2,
lat_deg=50.0,
lon_deg=36.3,
alt_m=215.0,
objective=WaypointObjective.LANDING,
source=WaypointSource.OPERATOR,
),
),
)
def _bbox() -> BoundingBox:
return BoundingBox(
min_lat_deg=49.99,
min_lon_deg=36.19,
max_lat_deg=50.06,
max_lon_deg=36.31,
)
def _request(
*,
flight_source=None,
sector_class: SectorClassification = SectorClassification.STABLE_REAR,
cache_root: Path | None = None,
) -> BuildCacheRequest:
return BuildCacheRequest(
flight_source=flight_source or FlightById(flight_id=_FLIGHT_ID),
sector_class=sector_class,
calibration_path=Path("/tmp/calibration.json"),
satellite_provider_url=_SAT_URL,
api_key=_API_KEY,
companion_address=_COMPANION,
expected_engines=("dinov2_vpr", "alike"),
cache_root=cache_root or Path("/tmp/cache_root"),
zoom_levels=(18,),
)
# ---------------------------------------------------------------------------
# Fakes
# ---------------------------------------------------------------------------
@dataclass
class _FakeFlightsApiClient(FlightsApiClient):
flight: FlightDto | None = None
fetch_calls: int = 0
load_calls: int = 0
bbox_calls: int = 0
takeoff_calls: int = 0
fetch_raises: Exception | None = None
load_raises: Exception | None = None
bbox_raises: Exception | None = None
bbox_value: BoundingBox = field(default_factory=_bbox)
captured_auth_tokens: list[str] = field(default_factory=list)
def fetch_flight(
self, *, flight_id, base_url, auth_token, timeout_s: float = 10.0
) -> FlightDto:
self.fetch_calls += 1
self.captured_auth_tokens.append(auth_token)
if self.fetch_raises is not None:
raise self.fetch_raises
assert self.flight is not None
return self.flight
def load_flight_file(self, *, path: Path) -> FlightDto:
self.load_calls += 1
if self.load_raises is not None:
raise self.load_raises
assert self.flight is not None
return self.flight
def bbox_from_waypoints(self, waypoints, *, buffer_m: float = 1000.0) -> BoundingBox:
self.bbox_calls += 1
if self.bbox_raises is not None:
raise self.bbox_raises
return self.bbox_value
def takeoff_origin_from_flight(self, flight: FlightDto) -> LatLonAlt:
self.takeoff_calls += 1
first = flight.waypoints[0]
return LatLonAlt(lat_deg=first.lat_deg, lon_deg=first.lon_deg, alt_m=first.alt_m)
@dataclass
class _FakeTileDownloader(TileDownloaderCut):
raises: Exception | None = None
report: DownloadBatchReportCut | None = None
calls: int = 0
captured_request: DownloadRequestCut | None = None
def download_tiles_for_area(self, request: DownloadRequestCut) -> DownloadBatchReportCut:
self.calls += 1
self.captured_request = request
if self.raises is not None:
raise self.raises
assert self.report is not None
return self.report
class _FakeSession(SshSession):
def __init__(self) -> None:
self.close_calls = 0
def run(self, command: str, *, timeout_s: float) -> RemoteCommandResult:
return RemoteCommandResult(exit_code=0, stdout="{}", stderr="")
def file_exists(self, remote_path: PurePosixPath) -> bool:
return False
def list_dir(self, remote_path: PurePosixPath) -> list[str]:
return []
def close(self) -> None:
self.close_calls += 1
@dataclass
class _FakeSshFactory(SshSessionFactory):
session: _FakeSession | None = None
open_calls: int = 0
open_raises: Exception | None = None
def open(self, address: CompanionAddress, *, timeout_s: float) -> SshSession:
self.open_calls += 1
if self.open_raises is not None:
raise self.open_raises
if self.session is None:
self.session = _FakeSession()
return self.session
@dataclass
class _FakeBringup:
"""Stand-in for :class:`CompanionBringup` (typed via duck-typing)."""
readiness: ReadinessReport | None = None
raises: Exception | None = None
calls: int = 0
def verify_companion_ready(self, address: CompanionAddress) -> ReadinessReport:
self.calls += 1
if self.raises is not None:
raise self.raises
if self.readiness is not None:
return self.readiness
return ReadinessReport(
manifest_present=True,
content_hashes_pass=True,
engines_present=True,
calibration_present=True,
outcome=ReadinessOutcome.READY,
not_ready_reasons=(),
companion_cache_root="/var/lib/azaion/c10/cache",
engines_inspected_count=2,
)
@dataclass
class _FakeRemoteInvoker:
"""Stand-in for :class:`RemoteCacheProvisionerInvoker`."""
report: RemoteBuildReport | None = None
raises: Exception | None = None
calls: int = 0
captured_request: RemoteBuildRequest | None = None
captured_secrets: tuple[str, ...] = ()
def invoke(
self,
session: SshSession,
request: RemoteBuildRequest,
*,
secrets_to_redact=(),
) -> RemoteBuildReport:
self.calls += 1
self.captured_request = request
self.captured_secrets = tuple(secrets_to_redact)
if self.raises is not None:
raise self.raises
if self.report is None:
return RemoteBuildReport(
outcome=RemoteBuildOutcome.SUCCESS,
engines_built=2,
engines_reused=0,
descriptors_generated=128,
manifest_hash="abc123",
failure_reason=None,
elapsed_s=5.5,
)
return self.report
class _FakeFileLockHandle(AbstractContextManager[None]):
def __init__(self, factory: _FakeLockFactory) -> None:
self._factory = factory
def __enter__(self) -> None:
return None
def __exit__(self, exc_type, exc, tb) -> None:
self._factory.exit_calls += 1
@dataclass
class _FakeLockFactory:
"""Records lock-acquire / release calls; can simulate timeout."""
raise_timeout: bool = False
acquire_calls: int = 0
exit_calls: int = 0
captured_paths: list[Path] = field(default_factory=list)
def try_lock(self, path: Path, *, timeout_s: float) -> AbstractContextManager[None]:
self.acquire_calls += 1
self.captured_paths.append(path)
if self.raise_timeout:
raise LockTimeout(path=path, timeout_s=timeout_s)
return _FakeFileLockHandle(self)
class _FakeClock:
def __init__(self) -> None:
self._t = 0
def monotonic_ns(self) -> int:
self._t += 1_000_000 # 1 ms tick per call
return self._t
def time_ns(self) -> int:
return self._t
def sleep_until_ns(self, deadline_ns: int) -> None: # pragma: no cover
return None
@dataclass
class _Fakes:
flights: _FakeFlightsApiClient
downloader: _FakeTileDownloader
bringup: _FakeBringup
invoker: _FakeRemoteInvoker
ssh_factory: _FakeSshFactory
lock_factory: _FakeLockFactory
logger: logging.Logger
log_records: list[logging.LogRecord]
@pytest.fixture
def fakes(tmp_path: Path) -> _Fakes:
flights = _FakeFlightsApiClient(flight=_flight())
downloader = _FakeTileDownloader(
report=DownloadBatchReportCut(
outcome=DownloadOutcomeCut.SUCCESS,
tiles_requested=12,
tiles_downloaded=12,
)
)
bringup = _FakeBringup()
invoker = _FakeRemoteInvoker()
ssh_factory = _FakeSshFactory()
lock_factory = _FakeLockFactory()
logger = logging.getLogger(f"test_build_cache_{tmp_path.name}")
logger.handlers.clear()
logger.propagate = False
log_records: list[logging.LogRecord] = []
class _Handler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
log_records.append(record)
handler = _Handler(level=logging.DEBUG)
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return _Fakes(
flights=flights,
downloader=downloader,
bringup=bringup,
invoker=invoker,
ssh_factory=ssh_factory,
lock_factory=lock_factory,
logger=logger,
log_records=log_records,
)
@pytest.fixture
def config(tmp_path: Path) -> C12BuildCacheConfig:
return C12BuildCacheConfig(
cache_staging_root=tmp_path / "staging",
lock_timeout_s=0.5,
ssh_connect_timeout_s=2.0,
flights_api_base_url="https://flights.example.com",
flights_api_auth_token=_AUTH_TOKEN,
zoom_levels=(18,),
)
def _orchestrator(fakes: _Fakes, config: C12BuildCacheConfig) -> BuildCacheOrchestrator:
return BuildCacheOrchestrator(
flights_api_client=fakes.flights,
tile_downloader=fakes.downloader,
companion_bringup=fakes.bringup, # type: ignore[arg-type]
remote_c10_invoker=fakes.invoker, # type: ignore[arg-type]
ssh_factory=fakes.ssh_factory,
lock_factory=fakes.lock_factory,
logger=fakes.logger,
clock=_FakeClock(),
config=config,
)
def _kinds(fakes: _Fakes) -> list[str]:
return [r.__dict__.get("kind") for r in fakes.log_records]
def _has_substring_in_any_log(fakes: _Fakes, needle: str) -> bool:
for record in fakes.log_records:
if needle in record.getMessage():
return True
for value in record.__dict__.values():
if isinstance(value, str) and needle in value:
return True
if isinstance(value, dict):
for v in value.values():
if isinstance(v, str) and needle in v:
return True
return False
# ---------------------------------------------------------------------------
# AC-1 — Happy path
# ---------------------------------------------------------------------------
class TestAc1HappyPath:
def test_full_pipeline_returns_success(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
orchestrator = _orchestrator(fakes, config)
report = orchestrator.build_cache(_request())
assert report.outcome is BuildCacheOutcome.SUCCESS
assert report.failure_phase is FailurePhase.NONE
assert report.flight_resolve_report is not None
assert report.download_report is not None
assert report.build_report is not None
# Sequencing — every fake hit exactly once in the right order.
assert fakes.flights.fetch_calls == 1
assert fakes.flights.bbox_calls == 1
assert fakes.flights.takeoff_calls == 1
assert fakes.lock_factory.acquire_calls == 1
assert fakes.downloader.calls == 1
assert fakes.bringup.calls == 1
assert fakes.ssh_factory.open_calls == 1
assert fakes.invoker.calls == 1
assert fakes.lock_factory.exit_calls == 1
def test_emits_three_required_info_logs(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
orchestrator = _orchestrator(fakes, config)
orchestrator.build_cache(_request())
kinds = _kinds(fakes)
assert kinds.count("c12.build_cache.flight_resolve.start") == 1
assert kinds.count("c12.build_cache.start") == 1
assert kinds.count("c12.build_cache.success") == 1
# ---------------------------------------------------------------------------
# AC-2 — Download failure aborts before C10
# ---------------------------------------------------------------------------
class SatelliteProviderError(Exception):
"""In-test stand-in for c11's SatelliteProviderError (recognised by name)."""
class TestAc2DownloadFailureAborts:
def test_returns_failure_report_and_skips_downstream(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
fakes.downloader.raises = SatelliteProviderError("503 Service Unavailable")
orchestrator = _orchestrator(fakes, config)
report = orchestrator.build_cache(_request())
assert report.outcome is BuildCacheOutcome.FAILURE
assert report.failure_phase is FailurePhase.DOWNLOAD
assert report.download_report is None
assert report.build_report is None
assert "503" in (report.failure_reason or "")
assert fakes.bringup.calls == 0
assert fakes.invoker.calls == 0
assert fakes.lock_factory.exit_calls == 1
assert "c12.build_cache.download.failed" in _kinds(fakes)
# ---------------------------------------------------------------------------
# AC-3 — Verify-ready failure aborts before C10
# ---------------------------------------------------------------------------
class TestAc3VerifyReadyFailureAborts:
def test_not_ready_returns_failure_and_skips_invoker(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
fakes.bringup.readiness = ReadinessReport(
manifest_present=False,
content_hashes_pass=False,
engines_present=False,
calibration_present=False,
outcome=ReadinessOutcome.NOT_READY,
not_ready_reasons=("manifest missing",),
companion_cache_root="/var/lib/azaion/c10/cache",
engines_inspected_count=0,
)
orchestrator = _orchestrator(fakes, config)
report = orchestrator.build_cache(_request())
assert report.outcome is BuildCacheOutcome.FAILURE
assert report.failure_phase is FailurePhase.DOWNLOAD
assert "manifest missing" in (report.failure_reason or "")
assert fakes.invoker.calls == 0
assert fakes.lock_factory.exit_calls == 1
assert "c12.build_cache.companion.not_ready" in _kinds(fakes)
# ---------------------------------------------------------------------------
# AC-4 — Build failure surfaces failure_phase=build
# ---------------------------------------------------------------------------
class EngineBuildError(Exception):
"""In-test stand-in for c10's EngineBuildError (recognised by name)."""
class TestAc4BuildFailure:
def test_invoker_raises_recognised_error_returns_failure(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
fakes.invoker.raises = EngineBuildError("CUDA OOM on backbone dinov2_vpr")
orchestrator = _orchestrator(fakes, config)
report = orchestrator.build_cache(_request())
assert report.outcome is BuildCacheOutcome.FAILURE
assert report.failure_phase is FailurePhase.BUILD
assert report.download_report is not None
assert report.build_report is None
assert "CUDA OOM" in (report.failure_reason or "")
assert fakes.lock_factory.exit_calls == 1
assert "c12.build_cache.build.failed" in _kinds(fakes)
def test_cache_build_error_remediation_mentions_cleanup(self) -> None:
err = CacheBuildError(
failure_phase=FailurePhase.BUILD,
wrapped_exception_repr="EngineBuildError(...)",
)
assert "cleanup" in err.remediation.lower() or "rm -rf" in err.remediation
# ---------------------------------------------------------------------------
# AC-5 — Lockfile prevents concurrent runs
# ---------------------------------------------------------------------------
class TestAc5LockHeld:
def test_timeout_raises_build_lock_held_error(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
fakes.lock_factory.raise_timeout = True
orchestrator = _orchestrator(fakes, config)
with pytest.raises(BuildLockHeldError) as exc_info:
orchestrator.build_cache(_request())
assert exc_info.value.failure_phase is FailurePhase.DOWNLOAD
assert fakes.downloader.calls == 0
assert fakes.bringup.calls == 0
assert fakes.invoker.calls == 0
assert "c12.build_cache.lock.held" in _kinds(fakes)
# ---------------------------------------------------------------------------
# AC-6 — Lockfile released even on unexpected exception
# ---------------------------------------------------------------------------
class TestAc6LockReleasedOnException:
def test_runtime_error_propagates_lock_released(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
# RuntimeError is NOT in any phase's recognised-name set, so it
# propagates per AC-6.
fakes.downloader.raises = RuntimeError("unexpected")
orchestrator = _orchestrator(fakes, config)
with pytest.raises(RuntimeError):
orchestrator.build_cache(_request())
assert fakes.lock_factory.exit_calls == 1
# ---------------------------------------------------------------------------
# AC-7 — Idempotent no-op surfaces correctly
# ---------------------------------------------------------------------------
class TestAc7IdempotentNoOp:
def test_idempotent_outcome_is_returned(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
fakes.invoker.report = RemoteBuildReport(
outcome=RemoteBuildOutcome.IDEMPOTENT_NO_OP,
engines_built=0,
engines_reused=2,
descriptors_generated=0,
manifest_hash="cached-hash",
failure_reason=None,
elapsed_s=0.1,
)
orchestrator = _orchestrator(fakes, config)
report = orchestrator.build_cache(_request())
assert report.outcome is BuildCacheOutcome.IDEMPOTENT_NO_OP
assert report.failure_phase is FailurePhase.NONE
assert report.failure_reason is None
assert "c12.build_cache.idempotent" in _kinds(fakes)
# ---------------------------------------------------------------------------
# AC-8 — remediation text per failure_phase
# ---------------------------------------------------------------------------
class TestAc8RemediationTextPerPhase:
def test_download_remediation_mentions_re_run(self) -> None:
err = CacheBuildError(failure_phase=FailurePhase.DOWNLOAD, wrapped_exception_repr="...")
assert "Re-run" in err.remediation
def test_build_remediation_mentions_cleanup(self) -> None:
err = CacheBuildError(failure_phase=FailurePhase.BUILD, wrapped_exception_repr="...")
assert "rm -rf" in err.remediation or "cleanup" in err.remediation.lower()
def test_lock_held_remediation_mentions_lock_path(self) -> None:
err = BuildLockHeldError(lock_path=Path("/tmp/.c12.lock"), timeout_s=5.0)
assert "/tmp/.c12.lock" in err.remediation
# ---------------------------------------------------------------------------
# AC-9 — api_key never leaks into log output
# ---------------------------------------------------------------------------
class TestAc9ApiKeyRedaction:
def test_no_log_record_contains_api_key(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
orchestrator = _orchestrator(fakes, config)
orchestrator.build_cache(_request())
assert not _has_substring_in_any_log(fakes, _API_KEY)
def test_secrets_forwarded_to_invoker_for_redaction(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
orchestrator = _orchestrator(fakes, config)
orchestrator.build_cache(_request())
assert _API_KEY in fakes.invoker.captured_secrets
assert _AUTH_TOKEN in fakes.invoker.captured_secrets
# ---------------------------------------------------------------------------
# AC-10 — Aggregated CacheBuildReport carries all sub-reports on success
# ---------------------------------------------------------------------------
class TestAc10AggregatedReport:
def test_success_report_carries_all_fields(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
orchestrator = _orchestrator(fakes, config)
report = orchestrator.build_cache(_request())
fr = report.flight_resolve_report
assert fr is not None
assert fr.flight_id == _FLIGHT_ID
assert fr.waypoint_count == 3
assert fr.bbox.min_lat_deg < fr.bbox.max_lat_deg
assert fr.takeoff_origin.lat_deg == 50.0
assert fr.raw_flight_dto is not None
dr = report.download_report
assert dr is not None
assert dr.tiles_downloaded == 12
br = report.build_report
assert br is not None
assert br.engines_built == 2
assert report.wall_clock_s > 0
# ---------------------------------------------------------------------------
# AC-11 — Flight-resolve failure aborts BEFORE the lockfile
# ---------------------------------------------------------------------------
class TestAc11FlightResolveBeforeLock:
def test_flight_not_found_skips_lock_and_downstream(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
fakes.flights.fetch_raises = FlightNotFoundError(f"flight not found: {_FLIGHT_ID}")
orchestrator = _orchestrator(fakes, config)
report = orchestrator.build_cache(_request())
assert report.outcome is BuildCacheOutcome.FAILURE
assert report.failure_phase is FailurePhase.FLIGHT_RESOLVE
assert report.flight_resolve_report is None
assert fakes.lock_factory.acquire_calls == 0
assert fakes.downloader.calls == 0
assert fakes.bringup.calls == 0
assert fakes.invoker.calls == 0
assert "c12.build_cache.flight_resolve.failed" in _kinds(fakes)
def test_flights_api_unreachable_also_aborts_pre_lock(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
fakes.flights.fetch_raises = FlightsApiUnreachableError("service unavailable")
orchestrator = _orchestrator(fakes, config)
report = orchestrator.build_cache(_request())
assert report.failure_phase is FailurePhase.FLIGHT_RESOLVE
assert fakes.lock_factory.acquire_calls == 0
# ---------------------------------------------------------------------------
# AC-12 — Offline FlightFromFile path
# ---------------------------------------------------------------------------
class TestAc12FlightFromFile:
def test_load_flight_file_called_when_source_is_file(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
orchestrator = _orchestrator(fakes, config)
request = _request(flight_source=FlightFromFile(path=Path("/tmp/flight.json")))
report = orchestrator.build_cache(request)
assert report.outcome is BuildCacheOutcome.SUCCESS
assert fakes.flights.load_calls == 1
assert fakes.flights.fetch_calls == 0
# ---------------------------------------------------------------------------
# AC-13 — takeoff_origin + flight_id forwarded to invoker
# ---------------------------------------------------------------------------
class TestAc13TakeoffOriginForwarded:
def test_invoker_received_takeoff_origin_and_flight_id(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
orchestrator = _orchestrator(fakes, config)
orchestrator.build_cache(_request())
captured = fakes.invoker.captured_request
assert captured is not None
assert captured.takeoff_origin == LatLonAlt(lat_deg=50.0, lon_deg=36.2, alt_m=200.0)
assert captured.flight_id == _FLIGHT_ID
# ---------------------------------------------------------------------------
# AC-14 — EmptyWaypointsError surfaces with failure_phase=flight_resolve
# ---------------------------------------------------------------------------
class TestAc14EmptyWaypoints:
def test_empty_waypoints_aborts_pre_lock(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
fakes.flights.bbox_raises = EmptyWaypointsError("no waypoints in flight")
orchestrator = _orchestrator(fakes, config)
report = orchestrator.build_cache(_request())
assert report.outcome is BuildCacheOutcome.FAILURE
assert report.failure_phase is FailurePhase.FLIGHT_RESOLVE
assert "empty waypoints" in (report.failure_reason or "")
assert fakes.lock_factory.acquire_calls == 0
# ---------------------------------------------------------------------------
# AC-15 — auth_token never leaks into log output (Phase 0)
# ---------------------------------------------------------------------------
class TestAc15AuthTokenRedaction:
def test_no_log_record_contains_auth_token(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
orchestrator = _orchestrator(fakes, config)
orchestrator.build_cache(_request())
assert not _has_substring_in_any_log(fakes, _AUTH_TOKEN)
def test_auth_token_passed_to_fetch_flight(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
# Sanity check — the token IS forwarded, just not logged.
orchestrator = _orchestrator(fakes, config)
orchestrator.build_cache(_request())
assert fakes.flights.captured_auth_tokens == [_AUTH_TOKEN]
# ---------------------------------------------------------------------------
# Verify-ready typed exception path (CompanionUnreachableError catch)
# ---------------------------------------------------------------------------
class TestVerifyReadyTypedExceptions:
def test_companion_unreachable_returns_failure_phase_download(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
fakes.bringup.raises = CompanionUnreachableError(
host="companion.local",
port=22,
reason=CompanionUnreachableReason.CONNECT_REFUSED,
underlying_exception_repr="ECONNREFUSED",
)
orchestrator = _orchestrator(fakes, config)
report = orchestrator.build_cache(_request())
assert report.outcome is BuildCacheOutcome.FAILURE
assert report.failure_phase is FailurePhase.DOWNLOAD
assert fakes.invoker.calls == 0
def test_content_hash_mismatch_returns_failure_phase_download(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
fakes.bringup.raises = ContentHashMismatchError(
engine_path="/var/lib/azaion/c10/cache/engines/dinov2_vpr.engine",
expected_sha256_hex="a" * 64,
actual_sha256_hex="b" * 64,
)
orchestrator = _orchestrator(fakes, config)
report = orchestrator.build_cache(_request())
assert report.failure_phase is FailurePhase.DOWNLOAD
assert fakes.invoker.calls == 0
# ---------------------------------------------------------------------------
# Download report.outcome=FAILURE → CacheBuildReport(failure_phase=download)
# ---------------------------------------------------------------------------
class TestDownloadReportOutcomeFailure:
def test_outcome_failure_in_download_report_returns_failure(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
fakes.downloader.report = DownloadBatchReportCut(
outcome=DownloadOutcomeCut.FAILURE,
tiles_requested=12,
tiles_downloaded=0,
failure_reason="rate limit budget exceeded",
)
orchestrator = _orchestrator(fakes, config)
report = orchestrator.build_cache(_request())
assert report.outcome is BuildCacheOutcome.FAILURE
assert report.failure_phase is FailurePhase.DOWNLOAD
assert report.failure_reason == "rate limit budget exceeded"
assert fakes.invoker.calls == 0
# ---------------------------------------------------------------------------
# Build report.outcome=FAILURE → CacheBuildReport(failure_phase=build)
# ---------------------------------------------------------------------------
class TestBuildReportOutcomeFailure:
def test_build_outcome_failure_in_report(
self, fakes: _Fakes, config: C12BuildCacheConfig
) -> None:
fakes.invoker.report = RemoteBuildReport(
outcome=RemoteBuildOutcome.FAILURE,
engines_built=1,
engines_reused=0,
descriptors_generated=0,
manifest_hash=None,
failure_reason="empty C6 corpus",
elapsed_s=2.0,
)
orchestrator = _orchestrator(fakes, config)
report = orchestrator.build_cache(_request())
assert report.outcome is BuildCacheOutcome.FAILURE
assert report.failure_phase is FailurePhase.BUILD
assert report.failure_reason == "empty C6 corpus"
assert report.build_report is not None # the failed report IS captured
# ---------------------------------------------------------------------------
# NFR-perf-overhead — orchestrator-only path, all-fake collaborators x 100
# ---------------------------------------------------------------------------
class TestNfrPerfOverhead:
def test_microbench_p99_under_50ms(self, fakes: _Fakes, config: C12BuildCacheConfig) -> None:
# Use real wall clock (not _FakeClock — it would skew elapsed_s
# but the test measures wall time, not orchestrator-reported s).
from gps_denied_onboard.clock import wall_clock as _wc
orchestrator = BuildCacheOrchestrator(
flights_api_client=fakes.flights,
tile_downloader=fakes.downloader,
companion_bringup=fakes.bringup, # type: ignore[arg-type]
remote_c10_invoker=fakes.invoker, # type: ignore[arg-type]
ssh_factory=fakes.ssh_factory,
lock_factory=fakes.lock_factory,
logger=fakes.logger,
clock=_wc.WallClock(),
config=config,
)
# Warm-up.
orchestrator.build_cache(_request())
durations_ms: list[float] = []
for _ in range(100):
start = time.perf_counter()
orchestrator.build_cache(_request())
durations_ms.append((time.perf_counter() - start) * 1000)
durations_ms.sort()
p99 = durations_ms[int(0.99 * len(durations_ms)) - 1]
assert p99 < 50.0, f"NFR-perf-overhead p99={p99:.2f} ms exceeded 50 ms budget"
# ---------------------------------------------------------------------------
# Composition-root smoke — services dataclass plumbs build_cache_orchestrator
# ---------------------------------------------------------------------------
class TestCompositionRootSmoke:
def test_companion_bringup_real_class_attaches(self, tmp_path: Path) -> None:
# Reasonable smoke: real CompanionBringup with a fake SSH factory
# constructs without raising; the orchestrator pulls the same
# instance via the services dataclass.
from gps_denied_onboard.components.c12_operator_tooling.remote_sidecar_verifier import (
RemoteSidecarVerifier,
)
fake_factory = _FakeSshFactory()
bringup = CompanionBringup(
ssh_factory=fake_factory,
sidecar_verifier=RemoteSidecarVerifier(timeout_s=5.0),
logger=logging.getLogger("test_smoke"),
config=C12CompanionConfig(
ssh_keyfile=Path(tmp_path / "key"),
host_key_policy=HostKeyPolicy.STRICT,
),
)
assert bringup is not None
# Real RemoteCacheProvisionerInvoker constructs cleanly too.
invoker = RemoteCacheProvisionerInvoker(logger=logging.getLogger("test"))
assert invoker is not None
@@ -1,8 +1,20 @@
"""AZ-326 — `build-cache` happy + unhappy paths (AC-11 .. AC-17, AC-3 mapping)."""
"""AZ-326 + AZ-328 — `build-cache` CLI happy + unhappy paths.
After AZ-328 the CLI no longer resolves the flight itself — it builds
a :class:`BuildCacheRequest` and hands it to the
:class:`BuildCacheOrchestrator` injected via the services dataclass.
The flight resolve happens inside the orchestrator.
The flag-mapping ACs from AZ-326 (AC-11 .. AC-17) are still enforced
here: the test fakes assert that the orchestrator received the right
request shape, and that ``CacheBuildReport.failure_exception_type``
fields drive the documented exit-code mapping.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from pathlib import Path
from types import SimpleNamespace
from typing import Any
@@ -12,99 +24,58 @@ import pytest
from click.testing import CliRunner
from gps_denied_onboard.components.c12_operator_tooling import (
EXIT_BUILD_FAILURE,
EXIT_DOWNLOAD_FAILURE,
EXIT_EMPTY_WAYPOINTS,
EXIT_FLIGHT_NOT_FOUND,
EXIT_FLIGHTS_API_AUTH,
EXIT_LOCK_HELD,
EXIT_OK,
EXIT_USAGE,
BuildCacheOutcome,
BuildCacheRequest,
BuildLockHeldError,
C12Config,
EmptyWaypointsError,
FlightDto,
FlightNotFoundError,
FlightsApiAuthError,
CacheBuildReport,
FailurePhase,
FlightById,
FlightFromFile,
SectorClassification,
WaypointDto,
WaypointObjective,
WaypointSource,
)
from gps_denied_onboard.components.c12_operator_tooling.cli import app
_FLIGHT_ID = UUID("00000000-0000-0000-0000-000000000001")
_API_KEY = "super-secret-api-key"
_SAT_URL = "https://satellite.example.com"
def _three_waypoint_flight() -> FlightDto:
return FlightDto(
flight_id=_FLIGHT_ID,
name="test-flight",
waypoints=tuple(
WaypointDto(
ordinal=i,
lat_deg=50.0 + i * 0.01,
lon_deg=36.0 + i * 0.01,
alt_m=100.0,
objective=(WaypointObjective.TAKEOFF if i == 0 else WaypointObjective.WAYPOINT),
source=WaypointSource.OPERATOR,
)
for i in range(3)
),
)
class _FakeFlightsApiClient:
"""Records `fetch_flight` / `load_flight_file` invocations."""
def __init__(
self,
*,
fetch_returns: FlightDto | None = None,
fetch_raises: Exception | None = None,
load_returns: FlightDto | None = None,
) -> None:
self._fetch_returns = fetch_returns
self._fetch_raises = fetch_raises
self._load_returns = load_returns
self.fetch_calls: list[dict[str, Any]] = []
self.load_calls: list[Path] = []
def fetch_flight(
self,
*,
flight_id: UUID,
base_url: str,
auth_token: str,
timeout_s: float = 10.0,
) -> FlightDto:
self.fetch_calls.append(
{"flight_id": flight_id, "base_url": base_url, "auth_token": auth_token}
)
if self._fetch_raises is not None:
raise self._fetch_raises
assert self._fetch_returns is not None
return self._fetch_returns
def load_flight_file(self, *, path: Path) -> FlightDto:
self.load_calls.append(path)
assert self._load_returns is not None
return self._load_returns
@dataclass
class _FakeOrchestrator:
def __init__(self) -> None:
self.calls: list[dict[str, Any]] = []
"""Records the :class:`BuildCacheRequest` and returns a scripted report."""
def build_cache(self, **kwargs: Any) -> None:
self.calls.append(kwargs)
return_report: CacheBuildReport | None = None
raise_on_call: Exception | None = None
captured: list[BuildCacheRequest] = field(default_factory=list)
def build_cache(self, request: BuildCacheRequest) -> CacheBuildReport:
self.captured.append(request)
if self.raise_on_call is not None:
raise self.raise_on_call
if self.return_report is not None:
return self.return_report
return CacheBuildReport(
outcome=BuildCacheOutcome.SUCCESS,
failure_phase=FailurePhase.NONE,
flight_resolve_report=None,
download_report=None,
build_report=None,
failure_reason=None,
wall_clock_s=0.1,
)
def _make_services(
*,
flights_client: _FakeFlightsApiClient,
orchestrator: _FakeOrchestrator | None = None,
) -> SimpleNamespace:
def _make_services(orchestrator: _FakeOrchestrator | None = None) -> SimpleNamespace:
return SimpleNamespace(
flights_api_client=flights_client,
flights_api_base_url="https://flights.test",
flights_api_auth_token="redacted-token",
build_cache_orchestrator=orchestrator or _FakeOrchestrator(),
)
@@ -116,12 +87,6 @@ def _invoke(
services: SimpleNamespace | None,
config: C12Config,
) -> Any:
"""Run ``operator-tool`` with a per-test ``services`` collaborator injected.
The CLI's top-level callback honours pre-populated ``ctx.obj`` dicts
of the form ``{"config": ..., "logger": ..., "services": ...}`` —
we build that dict here and pass it as ``obj=`` to ``CliRunner.invoke``.
"""
logger = logging.getLogger("test.c12.cli.build_cache")
logger.handlers.clear()
logger.addHandler(logging.NullHandler())
@@ -153,20 +118,31 @@ def calibration_path(tmp_path: Path) -> Path:
return p
class TestFlightIdHappyPath:
"""AC-11 — `--flight-id` resolves via fetch_flight and forwards FlightDto."""
def _required_args(calibration_path: Path) -> list[str]:
return [
"--calibration-path",
str(calibration_path),
"--companion-host",
"companion.local",
"--satellite-provider-url",
_SAT_URL,
"--api-key",
_API_KEY,
]
def test_orchestrator_called_with_resolved_dto(
class TestFlightIdHappyPath:
"""AC-11 — `--flight-id` builds a BuildCacheRequest with a `FlightById` source."""
def test_orchestrator_called_with_flight_by_id(
self,
runner: CliRunner,
base_config: C12Config,
calibration_path: Path,
) -> None:
# Arrange
flight = _three_waypoint_flight()
client = _FakeFlightsApiClient(fetch_returns=flight)
orchestrator = _FakeOrchestrator()
services = _make_services(flights_client=client, orchestrator=orchestrator)
services = _make_services(orchestrator)
# Act
result = _invoke(
@@ -177,8 +153,7 @@ class TestFlightIdHappyPath:
str(_FLIGHT_ID),
"--sector-class",
"stable_rear",
"--calibration-path",
str(calibration_path),
*_required_args(calibration_path),
],
services=services,
config=base_config,
@@ -186,21 +161,20 @@ class TestFlightIdHappyPath:
# Assert
assert result.exit_code == EXIT_OK, result.output
assert len(client.fetch_calls) == 1
assert client.fetch_calls[0]["flight_id"] == _FLIGHT_ID
assert len(client.load_calls) == 0
assert len(orchestrator.calls) == 1
call = orchestrator.calls[0]
assert call["flight"] is flight
assert call["sector_class"] is SectorClassification.STABLE_REAR
assert call["freshness_months"] == 12 # AC-NEW-6 stable_rear default
assert call["calibration_path"] == calibration_path
assert len(orchestrator.captured) == 1
request = orchestrator.captured[0]
assert isinstance(request.flight_source, FlightById)
assert request.flight_source.flight_id == _FLIGHT_ID
assert request.sector_class is SectorClassification.STABLE_REAR
assert request.calibration_path == calibration_path
assert request.companion_address.host == "companion.local"
assert request.satellite_provider_url == _SAT_URL
class TestFlightFileHappyPath:
"""AC-12 — `--flight-file` uses the offline loader; no fetch."""
"""AC-12 — `--flight-file` builds a BuildCacheRequest with a `FlightFromFile` source."""
def test_load_file_called_fetch_not_called(
def test_request_has_flight_from_file_source(
self,
runner: CliRunner,
base_config: C12Config,
@@ -210,10 +184,8 @@ class TestFlightFileHappyPath:
# Arrange
flight_file = tmp_path / "flight.json"
flight_file.write_text("{}", encoding="utf-8")
flight = _three_waypoint_flight()
client = _FakeFlightsApiClient(load_returns=flight)
orchestrator = _FakeOrchestrator()
services = _make_services(flights_client=client, orchestrator=orchestrator)
services = _make_services(orchestrator)
# Act
result = _invoke(
@@ -224,8 +196,7 @@ class TestFlightFileHappyPath:
str(flight_file),
"--sector-class",
"active_conflict",
"--calibration-path",
str(calibration_path),
*_required_args(calibration_path),
],
services=services,
config=base_config,
@@ -233,10 +204,10 @@ class TestFlightFileHappyPath:
# Assert
assert result.exit_code == EXIT_OK, result.output
assert len(client.load_calls) == 1
assert client.load_calls[0] == flight_file
assert len(client.fetch_calls) == 0
assert orchestrator.calls[0]["freshness_months"] == 1 # active_conflict
request = orchestrator.captured[0]
assert isinstance(request.flight_source, FlightFromFile)
assert request.flight_source.path == flight_file
assert request.sector_class is SectorClassification.ACTIVE_CONFLICT
class TestMutuallyExclusiveFlags:
@@ -252,8 +223,8 @@ class TestMutuallyExclusiveFlags:
# Arrange
flight_file = tmp_path / "flight.json"
flight_file.write_text("{}", encoding="utf-8")
client = _FakeFlightsApiClient()
services = _make_services(flights_client=client)
orchestrator = _FakeOrchestrator()
services = _make_services(orchestrator)
# Act
result = _invoke(
@@ -266,8 +237,7 @@ class TestMutuallyExclusiveFlags:
str(flight_file),
"--sector-class",
"stable_rear",
"--calibration-path",
str(calibration_path),
*_required_args(calibration_path),
],
services=services,
config=base_config,
@@ -275,8 +245,7 @@ class TestMutuallyExclusiveFlags:
# Assert
assert result.exit_code == EXIT_USAGE
assert len(client.fetch_calls) == 0
assert len(client.load_calls) == 0
assert len(orchestrator.captured) == 0
def test_neither_flag_set(
self,
@@ -285,8 +254,8 @@ class TestMutuallyExclusiveFlags:
calibration_path: Path,
) -> None:
# Arrange
client = _FakeFlightsApiClient()
services = _make_services(flights_client=client)
orchestrator = _FakeOrchestrator()
services = _make_services(orchestrator)
# Act
result = _invoke(
@@ -295,8 +264,7 @@ class TestMutuallyExclusiveFlags:
"build-cache",
"--sector-class",
"stable_rear",
"--calibration-path",
str(calibration_path),
*_required_args(calibration_path),
],
services=services,
config=base_config,
@@ -304,11 +272,23 @@ class TestMutuallyExclusiveFlags:
# Assert
assert result.exit_code == EXIT_USAGE
assert len(client.fetch_calls) == 0
assert len(orchestrator.captured) == 0
class TestFlightsApiErrorMapping:
"""AC-15, AC-16, AC-17 + AC-3 — error → exit code; auth_token never logged."""
"""AC-15, AC-16, AC-17 — failure_exception_type drives granular exit code."""
def _failure_report(self, exception_name: str) -> CacheBuildReport:
return CacheBuildReport(
outcome=BuildCacheOutcome.FAILURE,
failure_phase=FailurePhase.FLIGHT_RESOLVE,
flight_resolve_report=None,
download_report=None,
build_report=None,
failure_reason=f"{exception_name}: simulated",
wall_clock_s=0.0,
failure_exception_type=exception_name,
)
def test_flight_not_found_maps_to_exit_62(
self,
@@ -317,8 +297,8 @@ class TestFlightsApiErrorMapping:
calibration_path: Path,
) -> None:
# Arrange
client = _FakeFlightsApiClient(fetch_raises=FlightNotFoundError("not found"))
services = _make_services(flights_client=client)
orchestrator = _FakeOrchestrator(return_report=self._failure_report("FlightNotFoundError"))
services = _make_services(orchestrator)
# Act
result = _invoke(
@@ -329,15 +309,14 @@ class TestFlightsApiErrorMapping:
str(_FLIGHT_ID),
"--sector-class",
"stable_rear",
"--calibration-path",
str(calibration_path),
*_required_args(calibration_path),
],
services=services,
config=base_config,
)
# Assert
assert result.exit_code == EXIT_FLIGHT_NOT_FOUND
assert result.exit_code == EXIT_FLIGHT_NOT_FOUND, result.output
def test_auth_failure_maps_to_exit_61_and_no_token_in_log(
self,
@@ -346,8 +325,8 @@ class TestFlightsApiErrorMapping:
calibration_path: Path,
) -> None:
# Arrange
client = _FakeFlightsApiClient(fetch_raises=FlightsApiAuthError("denied"))
services = _make_services(flights_client=client)
orchestrator = _FakeOrchestrator(return_report=self._failure_report("FlightsApiAuthError"))
services = _make_services(orchestrator)
# Act
result = _invoke(
@@ -358,8 +337,7 @@ class TestFlightsApiErrorMapping:
str(_FLIGHT_ID),
"--sector-class",
"stable_rear",
"--calibration-path",
str(calibration_path),
*_required_args(calibration_path),
],
services=services,
config=base_config,
@@ -369,7 +347,7 @@ class TestFlightsApiErrorMapping:
assert result.exit_code == EXIT_FLIGHTS_API_AUTH
if base_config.log_path.exists():
log_text = base_config.log_path.read_text(encoding="utf-8")
assert "redacted-token" not in log_text
assert _API_KEY not in log_text
def test_empty_waypoints_maps_to_exit_64(
self,
@@ -378,8 +356,8 @@ class TestFlightsApiErrorMapping:
calibration_path: Path,
) -> None:
# Arrange
client = _FakeFlightsApiClient(fetch_raises=EmptyWaypointsError("zero"))
services = _make_services(flights_client=client)
orchestrator = _FakeOrchestrator(return_report=self._failure_report("EmptyWaypointsError"))
services = _make_services(orchestrator)
# Act
result = _invoke(
@@ -390,8 +368,7 @@ class TestFlightsApiErrorMapping:
str(_FLIGHT_ID),
"--sector-class",
"stable_rear",
"--calibration-path",
str(calibration_path),
*_required_args(calibration_path),
],
services=services,
config=base_config,
@@ -399,3 +376,93 @@ class TestFlightsApiErrorMapping:
# Assert
assert result.exit_code == EXIT_EMPTY_WAYPOINTS
class TestOrchestratorErrorMapping:
"""AZ-328 — orchestrator-raised exceptions map to dedicated exit codes."""
def test_build_lock_held_maps_to_exit_50(
self,
runner: CliRunner,
base_config: C12Config,
calibration_path: Path,
tmp_path: Path,
) -> None:
# Arrange
orchestrator = _FakeOrchestrator(
raise_on_call=BuildLockHeldError(lock_path=tmp_path / ".c12.lock", timeout_s=5.0)
)
services = _make_services(orchestrator)
# Act
result = _invoke(
runner,
[
"build-cache",
"--flight-id",
str(_FLIGHT_ID),
"--sector-class",
"stable_rear",
*_required_args(calibration_path),
],
services=services,
config=base_config,
)
# Assert
assert result.exit_code == EXIT_LOCK_HELD
class TestCacheBuildReportExitCodes:
"""AZ-328 AC-7 — idempotent_no_op exits 0; failure phases map per table."""
def _report(self, outcome: BuildCacheOutcome, failure_phase: FailurePhase) -> CacheBuildReport:
return CacheBuildReport(
outcome=outcome,
failure_phase=failure_phase,
flight_resolve_report=None,
download_report=None,
build_report=None,
failure_reason=None,
wall_clock_s=0.0,
)
@pytest.mark.parametrize(
"outcome,failure_phase,expected_exit",
[
(BuildCacheOutcome.SUCCESS, FailurePhase.NONE, EXIT_OK),
(BuildCacheOutcome.IDEMPOTENT_NO_OP, FailurePhase.NONE, EXIT_OK),
(BuildCacheOutcome.FAILURE, FailurePhase.DOWNLOAD, EXIT_DOWNLOAD_FAILURE),
(BuildCacheOutcome.FAILURE, FailurePhase.BUILD, EXIT_BUILD_FAILURE),
],
)
def test_outcome_to_exit_code_table(
self,
runner: CliRunner,
base_config: C12Config,
calibration_path: Path,
outcome: BuildCacheOutcome,
failure_phase: FailurePhase,
expected_exit: int,
) -> None:
# Arrange
orchestrator = _FakeOrchestrator(return_report=self._report(outcome, failure_phase))
services = _make_services(orchestrator)
# Act
result = _invoke(
runner,
[
"build-cache",
"--flight-id",
str(_FLIGHT_ID),
"--sector-class",
"stable_rear",
*_required_args(calibration_path),
],
services=services,
config=base_config,
)
# Assert
assert result.exit_code == expected_exit, result.output
@@ -0,0 +1,57 @@
"""AZ-328 — ``FilelockFileLockFactory`` real-filelock smoke tests."""
from __future__ import annotations
from pathlib import Path
import pytest
from gps_denied_onboard.components.c12_operator_tooling import (
FilelockFileLockFactory,
LockTimeout,
)
class TestFilelockFileLockFactory:
def test_acquire_and_release(self, tmp_path: Path) -> None:
factory = FilelockFileLockFactory()
lock_path = tmp_path / ".c12.lock"
with factory.try_lock(lock_path, timeout_s=1.0):
# Re-acquire from the same process with a tight timeout —
# filelock is reentrant by holder process, so this MAY succeed
# without raising; what we care about is that the basic
# acquire/release contract works.
assert lock_path.exists()
# Lock file may persist on POSIX (it's the rendezvous file)
# but it should now be released and re-acquirable.
with factory.try_lock(lock_path, timeout_s=1.0):
pass
def test_concurrent_lock_raises_lock_timeout(self, tmp_path: Path) -> None:
# filelock IS process-aware, so two SEPARATE FileLock objects
# against the same path from the same process WILL contend on
# POSIX — verify the timeout path raises our LockTimeout.
from filelock import FileLock as RealFileLock
lock_path = tmp_path / ".c12.lock"
held = RealFileLock(str(lock_path))
held.acquire(timeout=1.0)
try:
factory = FilelockFileLockFactory()
with pytest.raises(LockTimeout) as exc_info:
# Tight timeout — the held lock must NOT be released by
# this assertion path or the test loses meaning.
with factory.try_lock(lock_path, timeout_s=0.05):
pass # pragma: no cover
assert exc_info.value.path == lock_path
assert exc_info.value.timeout_s == 0.05
finally:
held.release()
def test_creates_parent_directory(self, tmp_path: Path) -> None:
factory = FilelockFileLockFactory()
nested = tmp_path / "nested" / "deeper" / ".c12.lock"
with factory.try_lock(nested, timeout_s=1.0):
assert nested.parent.is_dir()
@@ -0,0 +1,228 @@
"""AZ-328 — ``RemoteCacheProvisionerInvoker`` JSON wire + redaction smoke."""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from pathlib import Path, PurePosixPath
from uuid import UUID
import pytest
from gps_denied_onboard._types.geo import BoundingBox, LatLonAlt
from gps_denied_onboard.components.c12_operator_tooling import (
BuildReportParseError,
RemoteBuildOutcome,
RemoteCacheProvisionerInvoker,
SectorClassification,
)
from gps_denied_onboard.components.c12_operator_tooling.remote_c10_invoker import (
REDACTED_PLACEHOLDER,
RemoteBuildRequest,
)
from gps_denied_onboard.components.c12_operator_tooling.ssh_session import (
RemoteCommandResult,
SshSession,
)
@dataclass
class _ScriptedSession(SshSession):
stdout_payload: str = "{}"
stderr_payload: str = ""
exit_code: int = 0
captured_command: str | None = None
close_calls: int = 0
def run(self, command: str, *, timeout_s: float) -> RemoteCommandResult:
self.captured_command = command
return RemoteCommandResult(
exit_code=self.exit_code,
stdout=self.stdout_payload,
stderr=self.stderr_payload,
)
def file_exists(self, remote_path: PurePosixPath) -> bool:
return False
def list_dir(self, remote_path: PurePosixPath) -> list[str]:
return []
def close(self) -> None:
self.close_calls += 1
def _request() -> RemoteBuildRequest:
return RemoteBuildRequest(
bbox=BoundingBox(
min_lat_deg=49.99, min_lon_deg=36.19, max_lat_deg=50.06, max_lon_deg=36.31
),
zoom_levels=(18,),
sector_class=SectorClassification.STABLE_REAR,
calibration_path=Path("/tmp/calibration.json"),
expected_engines=("dinov2_vpr",),
companion_cache_root=PurePosixPath("/var/lib/azaion/c10/cache"),
takeoff_origin=LatLonAlt(lat_deg=50.0, lon_deg=36.2, alt_m=200.0),
flight_id=UUID("12345678-1234-1234-1234-123456789012"),
)
@pytest.fixture
def captured_logs() -> tuple[logging.Logger, list[logging.LogRecord]]:
records: list[logging.LogRecord] = []
logger = logging.getLogger("test_remote_c10_invoker")
logger.handlers.clear()
logger.propagate = False
class _Handler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
records.append(record)
handler = _Handler(level=logging.DEBUG)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return logger, records
class TestParseHappyPath:
def test_returns_remote_build_report(
self, captured_logs: tuple[logging.Logger, list[logging.LogRecord]]
) -> None:
logger, _ = captured_logs
payload = {
"outcome": "success",
"engines_built": 2,
"engines_reused": 1,
"descriptors_generated": 100,
"manifest_hash": "abc123",
"failure_reason": None,
"elapsed_s": 12.5,
}
session = _ScriptedSession(stdout_payload=json.dumps(payload))
invoker = RemoteCacheProvisionerInvoker(logger=logger)
report = invoker.invoke(session, _request())
assert report.outcome is RemoteBuildOutcome.SUCCESS
assert report.engines_built == 2
assert report.engines_reused == 1
assert report.manifest_hash == "abc123"
class TestParseProgressLines:
def test_progress_lines_logged_at_debug(
self, captured_logs: tuple[logging.Logger, list[logging.LogRecord]]
) -> None:
logger, records = captured_logs
payload = json.dumps(
{
"outcome": "success",
"engines_built": 1,
"engines_reused": 0,
"descriptors_generated": 50,
"manifest_hash": "h",
"failure_reason": None,
"elapsed_s": 1.0,
}
)
session = _ScriptedSession(
stdout_payload="progress: 10%\nprogress: 50%\nprogress: 100%\n" + payload
)
invoker = RemoteCacheProvisionerInvoker(logger=logger)
invoker.invoke(session, _request())
progress_records = [r for r in records if r.__dict__.get("kind") == "c10.remote.progress"]
assert len(progress_records) == 3
class TestRedaction:
def test_secret_in_progress_line_is_redacted(
self, captured_logs: tuple[logging.Logger, list[logging.LogRecord]]
) -> None:
logger, records = captured_logs
secret = "leaked-token-xyz"
payload = json.dumps(
{
"outcome": "success",
"engines_built": 0,
"engines_reused": 0,
"descriptors_generated": 0,
"manifest_hash": "h",
"failure_reason": None,
"elapsed_s": 0.0,
}
)
session = _ScriptedSession(
stdout_payload=f"some progress with {secret} embedded\n{payload}"
)
invoker = RemoteCacheProvisionerInvoker(logger=logger)
invoker.invoke(session, _request(), secrets_to_redact=[secret])
for record in records:
for value in record.__dict__.values():
if isinstance(value, dict):
for v in value.values():
if isinstance(v, str):
assert secret not in v
if secret in "some progress" or REDACTED_PLACEHOLDER in v:
pass
class TestParseFailures:
def test_non_zero_exit_code_raises_parse_error(
self, captured_logs: tuple[logging.Logger, list[logging.LogRecord]]
) -> None:
logger, _ = captured_logs
session = _ScriptedSession(
stdout_payload="some garbage", stderr_payload="oom killed", exit_code=137
)
invoker = RemoteCacheProvisionerInvoker(logger=logger)
with pytest.raises(BuildReportParseError):
invoker.invoke(session, _request())
def test_garbage_last_line_raises_parse_error(
self, captured_logs: tuple[logging.Logger, list[logging.LogRecord]]
) -> None:
logger, _ = captured_logs
session = _ScriptedSession(stdout_payload="not json")
invoker = RemoteCacheProvisionerInvoker(logger=logger)
with pytest.raises(BuildReportParseError):
invoker.invoke(session, _request())
def test_unknown_outcome_raises_parse_error(
self, captured_logs: tuple[logging.Logger, list[logging.LogRecord]]
) -> None:
logger, _ = captured_logs
session = _ScriptedSession(stdout_payload='{"outcome": "weird"}')
invoker = RemoteCacheProvisionerInvoker(logger=logger)
with pytest.raises(BuildReportParseError):
invoker.invoke(session, _request())
class TestCommandConstruction:
def test_command_pipes_json_request_to_companion_entry(
self, captured_logs: tuple[logging.Logger, list[logging.LogRecord]]
) -> None:
logger, _ = captured_logs
payload = json.dumps(
{
"outcome": "success",
"engines_built": 0,
"engines_reused": 0,
"descriptors_generated": 0,
"manifest_hash": "h",
"failure_reason": None,
"elapsed_s": 0.0,
}
)
session = _ScriptedSession(stdout_payload=payload)
invoker = RemoteCacheProvisionerInvoker(logger=logger)
invoker.invoke(session, _request())
# Expect the printf-pipe construct that feeds JSON via stdin.
assert session.captured_command is not None
assert "azaion-onboard c10 build" in session.captured_command
assert "--json-output" in session.captured_command
assert "--request-stdin" in session.captured_command
assert "printf" in session.captured_command