mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 08:41:12 +00:00
ba70381346
ci/woodpecker/push/02-build-push Pipeline failed
- Changed paths in documentation and configuration files to reflect the new naming convention for the NetVLAD model, transitioning from `models/netvlad/netvlad.pt` to `models/net_vlad/net_vlad.pt`. - Updated the `.gitignore` to include additional file types and directories related to input data and locally-generated evidence frames. - Removed the old NetVLAD checkpoint file as part of the transition to the new naming scheme. These changes ensure consistency across the project and improve the management of generated files.
729 lines
27 KiB
Python
729 lines
27 KiB
Python
"""Pytest fixtures for the AZ-404 E2E replay tests.
|
|
|
|
The fixtures are import-clean on dev macOS — the heavy work
|
|
(synthesizing the tlog, invoking the airborne CLI in a subprocess)
|
|
runs only when ``RUN_REPLAY_E2E=1`` is set in the environment.
|
|
Without the env var, the test module's collection-time skip marker
|
|
prevents the fixtures from being requested.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from collections.abc import Iterator
|
|
import dataclasses
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
|
|
from gps_denied_onboard.replay_input import load_tlog_ground_truth
|
|
from tests.e2e.replay._helpers import GroundTruthRow, load_ground_truth_csv
|
|
from tests.e2e.replay._tlog_synth import synthesize_tlog
|
|
|
|
|
|
# Duration cap used exclusively for the realtime-pacing test. The full
|
|
# Derkachi flight is ~490 s; running it at realtime pace in CI would take
|
|
# ~8 minutes. The realtime test passes --max-duration-s to the CLI so
|
|
# only this short clip is paced at wall-clock speed.
|
|
_REALTIME_TEST_CLIP_S: float = 60.0
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Path helpers
|
|
|
|
|
|
def _repo_root() -> Path:
|
|
return Path(__file__).resolve().parents[3]
|
|
|
|
|
|
def _derkachi_dir() -> Path:
|
|
return _repo_root() / "_docs" / "00_problem" / "input_data" / "flight_derkachi"
|
|
|
|
|
|
def _calibration_path() -> Path:
|
|
# AZ-702 ships a factory-sheet approximation for the Topotek
|
|
# KHP20S30 nadir camera at
|
|
# `_docs/00_problem/input_data/flight_derkachi/khp20s30_factory.json`.
|
|
# When present we use it; otherwise we fall back to the
|
|
# `adti26.json` placeholder so the AC-1/2/5/6 path stays
|
|
# exercisable on dev macOS without the AZ-702 deliverable.
|
|
factory_path = _derkachi_dir() / "khp20s30_factory.json"
|
|
if factory_path.is_file():
|
|
return factory_path
|
|
return _repo_root() / "tests" / "fixtures" / "calibration" / "adti26.json"
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Fixtures
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class DerkachiReplayInputs:
|
|
"""Bundle of paths the AZ-402 CLI consumes for a Derkachi replay run."""
|
|
|
|
video_path: Path
|
|
tlog_path: Path
|
|
imu_csv_path: Path
|
|
calibration_path: Path
|
|
config_path: Path
|
|
signing_key_path: Path
|
|
output_path: Path
|
|
ground_truth: list[GroundTruthRow]
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def derkachi_replay_inputs(tmp_path_factory: pytest.TempPathFactory) -> DerkachiReplayInputs:
|
|
"""Materialise Derkachi inputs + a synthesized tlog for the e2e run.
|
|
|
|
Session-scoped so the tlog synthesizer runs once across the whole
|
|
e2e collection. The tlog is cached at
|
|
``tmp_path_factory.mktemp("derkachi") / "synth.tlog"`` so each
|
|
pytest invocation gets a fresh copy; the synthesizer is fast
|
|
enough (~1 s for 60 s of data) that disk caching across invocations
|
|
is unnecessary.
|
|
"""
|
|
derkachi = _derkachi_dir()
|
|
csv_path = derkachi / "data_imu.csv"
|
|
video_path = derkachi / "flight_derkachi.mp4"
|
|
real_tlog_path = derkachi / "derkachi.tlog"
|
|
if not video_path.is_file():
|
|
pytest.fail(f"Derkachi fixture missing: {video_path}")
|
|
|
|
work_dir = tmp_path_factory.mktemp("derkachi")
|
|
# AZ-697: prefer the real binary tlog when present; fall back to
|
|
# synthesizing one from the CSV so dev environments without the
|
|
# 5.8 MB binary blob still exercise the e2e path.
|
|
if real_tlog_path.is_file():
|
|
tlog_path = real_tlog_path
|
|
gt_series = load_tlog_ground_truth(real_tlog_path).records
|
|
ground_truth_full = [
|
|
GroundTruthRow(
|
|
t_s=fix.ts_ns / 1e9,
|
|
lat_deg=fix.lat_deg,
|
|
lon_deg=fix.lon_deg,
|
|
alt_m=fix.alt_m,
|
|
)
|
|
for fix in gt_series
|
|
]
|
|
else:
|
|
if not csv_path.is_file():
|
|
pytest.fail(
|
|
f"Derkachi fixture missing: {csv_path} — see "
|
|
"_docs/00_problem/input_data/flight_derkachi/README.md"
|
|
)
|
|
tlog_path = work_dir / "synth.tlog"
|
|
synthesize_tlog(csv_path, tlog_path)
|
|
ground_truth_full = load_ground_truth_csv(csv_path)
|
|
|
|
# Empty signing key — the airborne replay path runs the signing
|
|
# handshake against `NoopMavlinkTransport`, so the key contents do
|
|
# not affect any wire output. We still need a real file because
|
|
# the CLI's path-validation gate requires it.
|
|
signing_key_path = work_dir / "signing_key.bin"
|
|
signing_key_path.write_bytes(b"\x00" * 32)
|
|
|
|
config_path = work_dir / "config.yaml"
|
|
config_path.write_text(
|
|
# Replay-specific overrides; the rest comes from the env vars
|
|
# the airborne binary's `load_config` honours by default.
|
|
#
|
|
# Per-component blocks at the TOP LEVEL — the YAML loader
|
|
# in `gps_denied_onboard.config.loader._load_yaml_files`
|
|
# treats each top-level mapping as a block whose key is a
|
|
# registry slug; nesting the slugs under a `components:`
|
|
# wrapper makes the loader silently drop them (the wrapper
|
|
# is not a registered slug).
|
|
#
|
|
# Open-loop ESKF composition profile (AZ-776 / ADR-012):
|
|
# `c4_pose.enabled = false` strips C4 from the composition
|
|
# graph so the airborne binary can run the mandatory simple
|
|
# baseline (KLT/RANSAC VIO + ESKF state estimator) end-to-end
|
|
# without a C4 anchor. ESKF has no iSAM2 graph for C4 to
|
|
# anchor against; the `compose_root` validation gate rejects
|
|
# the off-diagonal pairings (`enabled=False` + `gtsam_isam2`
|
|
# or `enabled=True` + `eskf`) with a `CompositionError`.
|
|
# Position drifts open-loop without C2/C3/C4 satellite
|
|
# re-anchoring — AZ-777 (Derkachi C6 reference tile cache)
|
|
# is the follow-up that closes the satellite-anchoring half
|
|
# of the per-frame loop.
|
|
"mode: replay\n"
|
|
"replay:\n"
|
|
" pace: asap\n"
|
|
" target_fc_dialect: ardupilot_plane\n"
|
|
"c1_vio:\n"
|
|
" strategy: klt_ransac\n"
|
|
"c4_pose:\n"
|
|
" enabled: false\n"
|
|
"c5_state:\n"
|
|
" strategy: eskf\n"
|
|
)
|
|
|
|
output_path = work_dir / "estimator_output.jsonl"
|
|
|
|
ground_truth = ground_truth_full
|
|
|
|
return DerkachiReplayInputs(
|
|
video_path=video_path,
|
|
tlog_path=tlog_path,
|
|
imu_csv_path=csv_path,
|
|
calibration_path=_calibration_path(),
|
|
config_path=config_path,
|
|
signing_key_path=signing_key_path,
|
|
output_path=output_path,
|
|
ground_truth=ground_truth,
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ReplayRunResult:
|
|
"""Outcome of a single ``gps-denied-replay`` subprocess run."""
|
|
|
|
returncode: int
|
|
stdout: str
|
|
stderr: str
|
|
output_path: Path
|
|
wall_clock_s: float
|
|
|
|
|
|
@pytest.fixture
|
|
def replay_runner(derkachi_replay_inputs: DerkachiReplayInputs) -> Any:
|
|
"""Return a callable that invokes the ``gps-denied-replay`` console-script.
|
|
|
|
The callable accepts keyword overrides for ``pace``,
|
|
``time_offset_ms``, and ``skip_auto_sync`` (AZ-611); everything
|
|
else is taken from ``derkachi_replay_inputs``. Output is written
|
|
to a fresh path per invocation so determinism comparisons (AC-5)
|
|
get two independent files.
|
|
|
|
Derkachi is a mid-flight fixture (no take-off spike) and the only
|
|
motion the video detector sees in the first 60 s is camera shake
|
|
and scenery change — neither tlog nor video can produce a
|
|
reliable auto-sync signal. The synth tlog and the video share
|
|
the same ``t=0`` anchor by construction (see
|
|
``_tlog_synth.py``), so the correct offset is exactly ``0``. The
|
|
fixture defaults reflect that — heavy ACs pass
|
|
``time_offset_ms=0`` + ``skip_auto_sync=True`` so the run never
|
|
touches the AC-9 validator that would otherwise reject the
|
|
fixture's false-positive video motion onset.
|
|
"""
|
|
|
|
binary = shutil.which("gps-denied-replay")
|
|
if binary is None:
|
|
venv_bin = Path(sys.executable).parent / "gps-denied-replay"
|
|
if venv_bin.exists():
|
|
binary = str(venv_bin)
|
|
if binary is None:
|
|
pytest.skip(
|
|
"gps-denied-replay console-script not on PATH; "
|
|
"install the package in the test venv"
|
|
)
|
|
|
|
invocation_count = {"n": 0}
|
|
|
|
def _run(
|
|
*,
|
|
pace: str = "asap",
|
|
time_offset_ms: int | None = 0,
|
|
skip_auto_sync: bool = True,
|
|
max_duration_s: float | None = None,
|
|
) -> ReplayRunResult:
|
|
import time
|
|
|
|
invocation_count["n"] += 1
|
|
out_path = derkachi_replay_inputs.output_path.with_name(
|
|
f"estimator_output_{invocation_count['n']}.jsonl"
|
|
)
|
|
argv = [
|
|
binary,
|
|
"--video",
|
|
str(derkachi_replay_inputs.video_path),
|
|
"--imu",
|
|
str(derkachi_replay_inputs.imu_csv_path),
|
|
"--output",
|
|
str(out_path),
|
|
"--camera-calibration",
|
|
str(derkachi_replay_inputs.calibration_path),
|
|
"--config",
|
|
str(derkachi_replay_inputs.config_path),
|
|
"--mavlink-signing-key",
|
|
str(derkachi_replay_inputs.signing_key_path),
|
|
"--pace",
|
|
pace,
|
|
]
|
|
# --tlog is deprecated under AZ-894 but we still forward it
|
|
# when the synth tlog exists, so the legacy-path e2e tests
|
|
# (test_derkachi_real_tlog.py) keep exercising the deprecation
|
|
# warning until AZ-895 deletes the path entirely.
|
|
if derkachi_replay_inputs.tlog_path.is_file():
|
|
argv.extend(["--tlog", str(derkachi_replay_inputs.tlog_path)])
|
|
if time_offset_ms is not None:
|
|
argv.extend(["--time-offset-ms", str(time_offset_ms)])
|
|
if skip_auto_sync:
|
|
argv.append("--skip-auto-sync")
|
|
if max_duration_s is not None:
|
|
argv.extend(["--max-duration-s", str(max_duration_s)])
|
|
# Build-flag env vars required by the airborne factories for
|
|
# the strategies the replay config selects (klt_ransac VIO +
|
|
# ESKF state estimator). Both default OFF in the factory
|
|
# gates — opt them in explicitly so the eager
|
|
# `_build_c5_state_estimator_pair` and the lazy c1_vio
|
|
# factory find their gating flags ON.
|
|
run_env = {
|
|
**os.environ,
|
|
"BUILD_KLT_RANSAC": "ON",
|
|
"BUILD_STATE_ESKF": "ON",
|
|
}
|
|
t0 = time.monotonic()
|
|
completed = subprocess.run(
|
|
argv,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=180,
|
|
env=run_env,
|
|
)
|
|
wall_s = time.monotonic() - t0
|
|
return ReplayRunResult(
|
|
returncode=completed.returncode,
|
|
stdout=completed.stdout,
|
|
stderr=completed.stderr,
|
|
output_path=out_path,
|
|
wall_clock_s=wall_s,
|
|
)
|
|
|
|
return _run
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def operator_pre_flight_setup(
|
|
derkachi_replay_inputs: DerkachiReplayInputs,
|
|
tmp_path_factory: pytest.TempPathFactory,
|
|
) -> Iterator["PopulatedC6Cache"]:
|
|
"""Operator C12 pre-flight: real C1+C2+C11+C10 wiring (AZ-839 / Epic AZ-835 C3).
|
|
|
|
Replaces the AZ-404 placeholder. Drives the operator-side
|
|
pre-flight pipeline end-to-end and yields the populated cache
|
|
so AC-8 (operator workflow rehearsal) and the AZ-840 e2e
|
|
orchestrator test can consume it.
|
|
|
|
Skip gates (in evaluation order — first match wins):
|
|
|
|
* ``RUN_REPLAY_E2E`` not in ``{1, true, yes, on}`` — same as
|
|
every other heavy test in this directory.
|
|
* ``SATELLITE_PROVIDER_URL`` / ``SATELLITE_PROVIDER_API_KEY``
|
|
missing — the C2 route client cannot reach the parent suite.
|
|
* ``BUILD_FAISS_INDEX`` not ON — the C6 ``DescriptorIndex``
|
|
runtime is gated by the env flag (``storage_factory.py``).
|
|
* ``GPS_DENIED_OPERATOR_CONFIG_PATH`` missing OR points at a
|
|
config that does not register every component this fixture
|
|
needs (c6_tile_cache + c7_inference + c10_provisioning +
|
|
c11_tile_manager) — the wiring would fail later with a less
|
|
readable error.
|
|
|
|
See ``tests/e2e/replay/_operator_pre_flight.py::populate_c6_from_route``
|
|
for the algorithm; this fixture only owns the
|
|
runtime-factory wiring + skip gates.
|
|
"""
|
|
|
|
skip_reason = _operator_pre_flight_skip_reason()
|
|
if skip_reason is not None:
|
|
pytest.skip(skip_reason)
|
|
|
|
yield from _build_operator_pre_flight_cache(
|
|
derkachi_replay_inputs=derkachi_replay_inputs,
|
|
tmp_path_factory=tmp_path_factory,
|
|
)
|
|
|
|
|
|
def _operator_pre_flight_skip_reason() -> str | None:
|
|
"""Return a SKIP reason string when env / build flags are not viable.
|
|
|
|
Centralised so the conditions stay testable + documented in one
|
|
place. Returns ``None`` when the fixture is allowed to run.
|
|
"""
|
|
|
|
if os.environ.get("RUN_REPLAY_E2E", "").strip().lower() not in {
|
|
"1",
|
|
"true",
|
|
"yes",
|
|
"on",
|
|
}:
|
|
return "AZ-839 operator_pre_flight_setup gated by RUN_REPLAY_E2E=1"
|
|
sp_url = os.environ.get("SATELLITE_PROVIDER_URL", "").strip()
|
|
sp_jwt = os.environ.get("SATELLITE_PROVIDER_API_KEY", "").strip()
|
|
if not sp_url:
|
|
return (
|
|
"AZ-839 operator_pre_flight_setup requires SATELLITE_PROVIDER_URL "
|
|
"(e.g. https://satellite-provider:8080)"
|
|
)
|
|
if not sp_jwt:
|
|
return (
|
|
"AZ-839 operator_pre_flight_setup requires SATELLITE_PROVIDER_API_KEY "
|
|
"(Bearer JWT for the parent-suite Route + Inventory APIs)"
|
|
)
|
|
if os.environ.get("BUILD_FAISS_INDEX", "").strip().lower() not in {
|
|
"on",
|
|
"1",
|
|
"true",
|
|
"yes",
|
|
}:
|
|
return (
|
|
"AZ-839 operator_pre_flight_setup requires BUILD_FAISS_INDEX=ON "
|
|
"(the C6 FaissDescriptorIndex runtime is build-flag-gated per "
|
|
"runtime_root.storage_factory)"
|
|
)
|
|
if not os.environ.get("GPS_DENIED_OPERATOR_CONFIG_PATH", "").strip():
|
|
return (
|
|
"AZ-839 operator_pre_flight_setup requires "
|
|
"GPS_DENIED_OPERATOR_CONFIG_PATH pointing at a YAML that "
|
|
"registers c6_tile_cache + c7_inference + c10_provisioning + "
|
|
"c11_tile_manager blocks (Jetson e2e harness sets this; "
|
|
"dev macOS does not)"
|
|
)
|
|
return None
|
|
|
|
|
|
def _build_operator_pre_flight_cache(
|
|
*,
|
|
derkachi_replay_inputs: DerkachiReplayInputs,
|
|
tmp_path_factory: pytest.TempPathFactory,
|
|
) -> Iterator["PopulatedC6Cache"]:
|
|
"""Wire the operator-side runtime graph and run the AZ-839 driver.
|
|
|
|
All imports of heavy collaborators (httpx, runtime_root factories,
|
|
c10/c11/c6 modules) live inside this function so collection on
|
|
dev macOS without the e2e env stays cheap (the SKIP path returns
|
|
before reaching this body).
|
|
|
|
Raises:
|
|
pytest.skip.Exception: when an env-flagged dependency
|
|
(e.g. ``c10_provisioning`` config block, route extraction)
|
|
cannot be satisfied and re-running with the right env is
|
|
the right next step.
|
|
"""
|
|
|
|
import httpx
|
|
|
|
from gps_denied_onboard.clock.wall_clock import WallClock
|
|
from gps_denied_onboard.config.loader import load_config
|
|
from gps_denied_onboard.replay_input.tlog_route import (
|
|
extract_route_from_tlog,
|
|
)
|
|
from gps_denied_onboard.runtime_root.c10_factory import (
|
|
build_descriptor_batcher,
|
|
build_engine_compiler,
|
|
)
|
|
from gps_denied_onboard.runtime_root.c11_factory import (
|
|
build_tile_downloader,
|
|
)
|
|
from gps_denied_onboard.runtime_root.storage_factory import (
|
|
build_descriptor_index,
|
|
build_tile_metadata_store,
|
|
build_tile_store,
|
|
)
|
|
|
|
from tests.e2e.replay._operator_pre_flight import (
|
|
populate_c6_from_route,
|
|
)
|
|
|
|
config_path = Path(os.environ["GPS_DENIED_OPERATOR_CONFIG_PATH"])
|
|
if not config_path.is_file():
|
|
pytest.skip(
|
|
f"GPS_DENIED_OPERATOR_CONFIG_PATH points at a non-file: {config_path}"
|
|
)
|
|
config = load_config(os.environ, paths=[config_path])
|
|
|
|
cache_root = tmp_path_factory.mktemp("operator_pre_flight_cache")
|
|
# PostgresFilesystemStore writes JPEGs under `<root_dir>/tiles/`;
|
|
# FaissDescriptorIndex falls back to `<root_dir>/descriptor.index`
|
|
# when `faiss_index_path` is empty. Override the c6_tile_cache
|
|
# block in-memory so the production components built below
|
|
# (build_tile_store / build_descriptor_index / batcher) write to
|
|
# the same `cache_root` PopulatedC6Cache advertises. Without this
|
|
# the static YAML at GPS_DENIED_OPERATOR_CONFIG_PATH would route
|
|
# writes to its baked-in `root_dir` while the verifier read from
|
|
# the fixture's tmp path, breaking AC-3 / AC-6 on Tier-2.
|
|
c6_block = config.components["c6_tile_cache"]
|
|
c6_block_overridden = dataclasses.replace(
|
|
c6_block,
|
|
root_dir=str(cache_root),
|
|
faiss_index_path="",
|
|
)
|
|
config = dataclasses.replace(
|
|
config,
|
|
components={**config.components, "c6_tile_cache": c6_block_overridden},
|
|
)
|
|
tile_store_path = cache_root / "tiles"
|
|
faiss_index_path = cache_root / "descriptor.index"
|
|
|
|
route_spec = extract_route_from_tlog(
|
|
derkachi_replay_inputs.tlog_path,
|
|
max_waypoints=10,
|
|
)
|
|
|
|
sp_url = os.environ["SATELLITE_PROVIDER_URL"].strip()
|
|
sp_jwt = os.environ["SATELLITE_PROVIDER_API_KEY"].strip()
|
|
tls_insecure = os.environ.get(
|
|
"SATELLITE_PROVIDER_TLS_INSECURE", ""
|
|
).strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
from gps_denied_onboard.components.c11_tile_manager.route_client import (
|
|
SatelliteProviderRouteClient,
|
|
)
|
|
|
|
route_client = SatelliteProviderRouteClient(
|
|
base_url=sp_url,
|
|
jwt=sp_jwt,
|
|
tls_insecure=tls_insecure,
|
|
)
|
|
|
|
tile_store = build_tile_store(config)
|
|
tile_metadata_store = build_tile_metadata_store(config)
|
|
# AZ-964: FaissDescriptorIndex._load() requires the .index +
|
|
# .sha256 + .meta.json triplet to exist on disk before the factory
|
|
# returns. populate_c6_from_route (below) builds the real index
|
|
# once route tiles are downloaded; until then, seed an empty
|
|
# HNSW32 fixture so the factory call succeeds.
|
|
from tests.e2e.replay._faiss_seed import seed_empty_faiss_index
|
|
|
|
seed_empty_faiss_index(cache_root)
|
|
descriptor_index = build_descriptor_index(config)
|
|
|
|
httpx_client = httpx.Client(
|
|
verify=not tls_insecure,
|
|
timeout=httpx.Timeout(30.0),
|
|
headers={"Authorization": f"Bearer {sp_jwt}"},
|
|
)
|
|
tile_downloader = build_tile_downloader(
|
|
config,
|
|
http_client=httpx_client,
|
|
tile_store=tile_store,
|
|
tile_metadata_store=tile_metadata_store,
|
|
budget_enforcer=tile_store,
|
|
)
|
|
|
|
clock = WallClock()
|
|
engine_compiler = build_engine_compiler(config)
|
|
backbone_embedder = _build_replay_backbone_embedder(
|
|
config=config,
|
|
engine_compiler=engine_compiler,
|
|
cache_root=cache_root,
|
|
)
|
|
|
|
descriptor_batcher = build_descriptor_batcher(
|
|
config,
|
|
backbone_embedder=backbone_embedder,
|
|
tile_metadata_store=tile_metadata_store,
|
|
tile_store=tile_store,
|
|
descriptor_index=descriptor_index,
|
|
clock=clock,
|
|
)
|
|
|
|
def _descriptor_index_factory() -> Any:
|
|
from gps_denied_onboard.components.c6_tile_cache.faiss_descriptor_index import ( # noqa: E501
|
|
FaissDescriptorIndex,
|
|
)
|
|
from gps_denied_onboard.helpers.sha256_sidecar import Sha256Sidecar
|
|
from gps_denied_onboard.logging import get_logger
|
|
|
|
return FaissDescriptorIndex(
|
|
index_path=faiss_index_path,
|
|
sidecar=Sha256Sidecar(),
|
|
logger=get_logger("c6_tile_cache.faiss_descriptor_index"),
|
|
)
|
|
|
|
populated = populate_c6_from_route(
|
|
route_spec=route_spec,
|
|
route_client=route_client,
|
|
tile_downloader=tile_downloader,
|
|
descriptor_batcher=descriptor_batcher,
|
|
descriptor_index_factory=_descriptor_index_factory,
|
|
cache_root=cache_root,
|
|
tile_store_path=tile_store_path,
|
|
faiss_index_path=faiss_index_path,
|
|
)
|
|
try:
|
|
yield populated
|
|
finally:
|
|
httpx_client.close()
|
|
|
|
|
|
def _build_replay_backbone_embedder(
|
|
*,
|
|
config: Any,
|
|
engine_compiler: Any,
|
|
cache_root: Path,
|
|
) -> Any:
|
|
"""Compile the first configured backbone and wrap it for the AZ-322 batcher.
|
|
|
|
The replay-mode operator binary does not exist yet (tracked under
|
|
Epic AZ-835); until it does, this fixture performs the wiring
|
|
inline. The path is deliberately the production path:
|
|
|
|
* :func:`runtime_root.c10_factory.build_engine_compiler` builds
|
|
the AZ-321 :class:`EngineCompiler`.
|
|
* The first backbone in
|
|
``config.components['c10_provisioning'].backbones`` is
|
|
compiled to an engine cache entry; the AZ-297
|
|
:class:`InferenceRuntime` deserialises it into the
|
|
:class:`EngineHandle` the embedder consumes.
|
|
* The tile decoder converts a C6 :class:`TilePixelHandle`
|
|
(mmap of JPEG bytes) to the ``np.float32`` tensor shape the
|
|
backbone expects via OpenCV — the same primitive the C7
|
|
pre-processor uses.
|
|
|
|
Tests / dev workstations without a backbone ONNX or a working
|
|
:class:`InferenceRuntime` fail this function, which surfaces as
|
|
a fixture error (deliberate — the SKIP gate above is meant to
|
|
catch the env-mismatch case before we get here).
|
|
"""
|
|
|
|
from gps_denied_onboard._types.inference import PrecisionMode
|
|
from gps_denied_onboard._types.manifests import HostCapabilities
|
|
from gps_denied_onboard.components.c10_provisioning.c7_engine_embedder import (
|
|
C7EngineBackboneEmbedder,
|
|
)
|
|
from gps_denied_onboard.components.c10_provisioning.engine_compiler import (
|
|
EngineCompileRequest,
|
|
)
|
|
from gps_denied_onboard.logging import get_logger
|
|
from gps_denied_onboard.runtime_root.c10_factory import (
|
|
build_backbone_specs,
|
|
)
|
|
from gps_denied_onboard.runtime_root.inference_factory import (
|
|
build_inference_runtime,
|
|
)
|
|
|
|
backbones = build_backbone_specs(config)
|
|
if not backbones:
|
|
pytest.skip(
|
|
"AZ-839 operator_pre_flight_setup: config has no "
|
|
"c10_provisioning.backbones entries — the e2e harness "
|
|
"config must declare at least one backbone (typically "
|
|
"DINOv2-VPR or NetVLAD per AZ-321)."
|
|
)
|
|
|
|
host = HostCapabilities(sm=87, jetpack="6.2", trt="10.3")
|
|
engine_cache_root = cache_root / "engines"
|
|
engine_cache_root.mkdir(parents=True, exist_ok=True)
|
|
request = EngineCompileRequest(
|
|
backbones=backbones,
|
|
calibration_path=None,
|
|
cache_root=engine_cache_root,
|
|
precision=PrecisionMode.FP16,
|
|
host=host,
|
|
workspace_mb=int(
|
|
config.components["c10_provisioning"].workspace_mb
|
|
),
|
|
)
|
|
results = engine_compiler.compile_engines_for_corpus(request)
|
|
if not results:
|
|
pytest.skip(
|
|
"AZ-839 operator_pre_flight_setup: engine compiler returned "
|
|
"empty results — corpus failed to compile."
|
|
)
|
|
first = results[0]
|
|
spec = backbones[0]
|
|
inference_runtime = build_inference_runtime(config)
|
|
descriptor_dim = _resolve_replay_descriptor_dim(config, spec)
|
|
# The c10 engine compiler treats backbones generically and does not
|
|
# know about c2_vpr's architecture registry. The c2_vpr factory
|
|
# would do this registration on its own create() path, but this
|
|
# fixture bypasses build_vpr_strategy. Register the strategy's
|
|
# NN architecture here so deserialize_engine can find it.
|
|
_register_replay_strategy_architecture(config, descriptor_dim)
|
|
engine_handle = inference_runtime.deserialize_engine(first.entry)
|
|
return C7EngineBackboneEmbedder(
|
|
inference_runtime=inference_runtime,
|
|
engine_handle=engine_handle,
|
|
input_name=spec.input_name,
|
|
output_name="descriptor",
|
|
descriptor_dim=descriptor_dim,
|
|
tile_decoder=_default_tile_decoder,
|
|
logger=get_logger("c10_provisioning.replay_backbone_embedder"),
|
|
)
|
|
|
|
|
|
def _register_replay_strategy_architecture(
|
|
config: Any, descriptor_dim: int
|
|
) -> None:
|
|
"""Register c2_vpr's NN architecture with c7's registry.
|
|
|
|
Production runs go through ``vpr_factory.build_vpr_strategy`` which
|
|
invokes ``_register_strategy_architecture`` as a side effect before
|
|
the strategy is bound. The AZ-839 fixture pre-builds engines via
|
|
c10 directly (the operator pre-flight cache responsibility) and
|
|
skips ``build_vpr_strategy``, so the registration would never run.
|
|
Without this, ``InferenceRuntime.deserialize_engine`` raises
|
|
``EngineDeserializeError: No architecture registered for
|
|
model_name='net_vlad'`` when looking up the factory by file stem.
|
|
"""
|
|
block = config.components.get("c2_vpr") if config.components else None
|
|
strategy = getattr(block, "strategy", None) if block is not None else None
|
|
if strategy != "net_vlad":
|
|
return # other strategies handle their own registration / no-op
|
|
from gps_denied_onboard.components.c2_vpr.net_vlad import (
|
|
MODEL_NAME,
|
|
architecture_factory,
|
|
)
|
|
from gps_denied_onboard.components.c7_inference import register_architecture
|
|
|
|
register_architecture(MODEL_NAME, architecture_factory(descriptor_dim))
|
|
|
|
|
|
def _resolve_replay_descriptor_dim(config: Any, spec: Any) -> int:
|
|
"""Resolve the descriptor output dimension for the AZ-839 NetVLAD baseline.
|
|
|
|
The AZ-839 task spec pins the C2 backbone at NetVLAD (per
|
|
``c2_vpr/config.py:67``); :class:`C2VprConfig.netvlad_descriptor_dim`
|
|
is the canonical source. We read the c2_vpr block and fall back
|
|
to the architecture default ``4096`` when the block is absent so
|
|
operators on a hand-rolled YAML still get a coherent dim. Other
|
|
backbones (UltraVPR=512, MegaLoc=2048, MixVPR=4096) require
|
|
swapping this resolver — out of scope for AZ-839.
|
|
"""
|
|
|
|
block = config.components.get("c2_vpr") if config.components else None
|
|
if block is not None and getattr(block, "strategy", "") == "net_vlad":
|
|
return int(getattr(block, "netvlad_descriptor_dim", 4096))
|
|
pytest.skip(
|
|
"AZ-839 operator_pre_flight_setup: descriptor_dim resolver "
|
|
f"only supports c2_vpr.strategy='net_vlad'; got "
|
|
f"{getattr(block, 'strategy', '<missing>')!r} on backbone "
|
|
f"{spec.model_name!r}. See AZ-839 spec § Out of scope."
|
|
)
|
|
raise AssertionError("unreachable: pytest.skip raises")
|
|
|
|
|
|
def _default_tile_decoder(handle: Any) -> Any:
|
|
"""Decode a C6 :class:`TilePixelHandle` (JPEG mmap) to a CHW float32 tensor.
|
|
|
|
The handle exposes ``read_bytes()`` (or context-manager + ``read``);
|
|
we prefer the simpler ``read_bytes()`` path. OpenCV imdecode
|
|
yields HWC-uint8-BGR; the embedder expects float32-CHW-RGB
|
|
normalised to ``[0, 1]`` (DINOv2-VPR + NetVLAD share this layout).
|
|
Imports are lazy — no OpenCV penalty when this module is imported
|
|
on dev macOS.
|
|
"""
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
if hasattr(handle, "read_bytes"):
|
|
blob = handle.read_bytes()
|
|
else:
|
|
with handle as opened:
|
|
blob = opened.read()
|
|
arr = np.frombuffer(blob, dtype=np.uint8)
|
|
bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
|
|
if bgr is None:
|
|
raise RuntimeError("cv2.imdecode returned None for tile handle")
|
|
rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
|
|
chw = np.transpose(rgb, (2, 0, 1)).astype(np.float32) / 255.0
|
|
return chw
|