Files
gps-denied-onboard/tests/e2e/replay/test_operator_pre_flight_driver.py
Oleksandr Bezdieniezhnykh fd52cc9b1d [AZ-845][AZ-846][AZ-847] Refactor 02: relocate RouteSpec + widen lint
Cycle-3 refactor run 02-az507 (RouteSpec relocation + module-layout
refresh + AZ-270 lint widening). Single batch of 3 tasks; epic AZ-844.

AZ-845 — Relocate RouteSpec DTO to _types/route.py (rule-9 fix):
  * New canonical home: src/gps_denied_onboard/_types/route.py
    (frozen+slots dataclass; full docstring carried over verbatim).
  * c11_tile_manager/route_client.py imports from _types.route.
  * replay_input/tlog_route.py and replay_input/__init__.py keep
    re-exports for backward-compat (RouteSpec in __all__).
  * 5 test files updated to import from _types.route for symmetry.
  * Identity-preserving re-export verified by new test
    test_az845_routespec_canonical_home_and_reexport_identity.

AZ-846 — Refresh module-layout.md cycle-3 entries:
  * c11_tile_manager Internal list rewritten with all 8 internals
    (alphabetised) — corrects a stale entry that referenced files
    (satellite_provider_*.py) that no longer exist.
  * shared/replay_input file list adds errors.py (cycle-2 carry),
    tlog_ground_truth.py (cycle-2 carry), tlog_route.py (cycle-3 NEW).
  * shared/_types section registers route.py with provenance line.
  * Out-of-scope cycle-2 carry-overs (replay_api/, cli/render_map.py,
    helpers/gps_compare.py, etc.) intentionally untouched.

AZ-847 — Widen test_az270 lint to enforce full rule-9 allow-list:
  * test_ac6_only_compose_root_imports_concrete_strategies now walks
    every components/<X>/*.py ImportFrom/Import and rejects anything
    not in the rule-9 allow-list (own subpackage + _types + helpers
    + config/logging/fdr_client/clock + frame_source interface-only).
  * Strict superset of the original AC-6 narrow check.
  * Reports zero violations on the codebase post-AZ-845.
  * Two principled carve-outs documented in the test docstring:
    - components/<X>/bench/** path skip (measurement code legitimately
      constructs production strategies via runtime_root factories).
    - register_* lazy self-registration imports from
      runtime_root.<X>_factory (central-registry plugin pattern).
  * Both carve-outs surfaced to user via Choose A/B/C/D Risk-1
    protocol; user skipped both — agent proceeded with documented
    defaults. Doc-only follow-up tracked in
    _docs/_process_leftovers/2026-05-24_az847_rule9_wording_followup.md
    for rule-9 wording update in module-layout.md.

Test results: 2287 passed, 90 skipped (environmental — Docker / CUDA
/ TensorRT / Jetson hardware / fixtures), 0 failed. Focused subset
(replay_input/ + c11_tile_manager/ + test_az270_compose_root.py)
also clean: 169 passed, 1 skipped.

Tracker: AZ-845/846/847 transitioned In Progress -> In Testing.
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-24 10:07:20 +03:00

481 lines
16 KiB
Python

"""Unit tests for ``populate_c6_from_route`` (AZ-839 AC-8).
Covers the AZ-839 acceptance criteria that can be exercised against
stubbed dependencies (the AC-9 integration test against the Jetson
harness lives in ``test_derkachi_real_tlog.py`` once Epic AZ-835
completes):
* AC-3 happy path — driver returns a populated cache with paths
pointing at the on-disk sidecar triple.
* AC-4 — :class:`RouteValidationError` and
:class:`RouteTerminalFailureError` propagate unchanged with their
original cause; no silent swallow.
* AC-5 — :class:`RouteTransientError` triggers retry up to 3 attempts
using the documented backoff schedule. Final attempt's exception is
propagated unchanged.
* AC-6 — Tamper between rebuild and verify (simulated by having
``descriptor_index_factory`` raise :class:`IndexUnavailableError`)
surfaces the failure and leaves no half-built artifacts.
* AC-7 — Cleanup on failure removes any sidecar file the driver
produced (pre-existing files are preserved).
The driver intentionally takes every collaborator via dependency
injection so this module never imports httpx, FAISS, or Postgres.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from unittest.mock import MagicMock
from uuid import uuid4
import pytest
from gps_denied_onboard.components.c10_provisioning.descriptor_batcher import (
BatcherOutcome,
DescriptorBatchReport,
)
from gps_denied_onboard.components.c11_tile_manager import (
DownloadOutcome,
SectorClassification,
)
from gps_denied_onboard.components.c11_tile_manager._types import (
DownloadBatchReport,
)
from gps_denied_onboard.components.c11_tile_manager.errors import (
RouteTerminalFailureError,
RouteTransientError,
RouteValidationError,
)
from gps_denied_onboard.components.c11_tile_manager.route_client import (
RouteSeedResult,
)
from gps_denied_onboard.components.c6_tile_cache.errors import (
IndexUnavailableError,
)
from gps_denied_onboard.components.c6_tile_cache.faiss_descriptor_index import (
META_SUFFIX,
)
from gps_denied_onboard.helpers.sha256_sidecar import SIDECAR_SUFFIX
from gps_denied_onboard._types.route import RouteSpec
from tests.e2e.replay._operator_pre_flight import (
PopulatedC6Cache,
populate_c6_from_route,
)
# ----------------------------------------------------------------------
# Helpers
@dataclass
class _DriverHarness:
"""Bundle of paths + collaborators wired into one driver call."""
cache_root: Path
tile_store_path: Path
faiss_index_path: Path
sha256_path: Path
meta_path: Path
route_spec: RouteSpec
route_client: MagicMock
tile_downloader: MagicMock
descriptor_batcher: MagicMock
descriptor_index_factory: MagicMock
sleep_calls: list[float]
def _build_harness(tmp_path: Path) -> _DriverHarness:
"""Wire a self-contained harness with sane default stub returns.
Each collaborator is a :class:`MagicMock` with a default success
return value; tests override per-call as needed.
"""
cache_root = tmp_path / "cache_root"
cache_root.mkdir()
tile_store_path = cache_root / "tile_store"
tile_store_path.mkdir()
faiss_index_path = cache_root / "descriptor.index"
sha256_path = Path(str(faiss_index_path) + SIDECAR_SUFFIX)
meta_path = Path(str(faiss_index_path) + META_SUFFIX)
route_spec = RouteSpec(
waypoints=(
(50.10, 36.10),
(50.11, 36.11),
(50.12, 36.12),
),
suggested_region_size_meters=500.0,
source_tlog=Path("test.tlog"),
source_segment=(0, 100),
total_distance_meters=1500.0,
)
route_client = MagicMock()
route_client.seed_route.return_value = RouteSeedResult(
route_id=uuid4(),
terminal_status="completed",
maps_ready=True,
tile_count=12,
elapsed_ms=2500,
submitted_payload_sha256="cafebabe" * 8,
)
tile_downloader = MagicMock()
tile_downloader.download_tiles_for_area.return_value = DownloadBatchReport(
outcome=DownloadOutcome.SUCCESS,
tiles_requested=12,
tiles_downloaded=12,
tiles_rejected_resolution=0,
tiles_rejected_freshness=0,
tiles_downgraded=0,
retry_count=0,
request_hash="abcdef0123456789",
)
descriptor_batcher = MagicMock()
descriptor_batcher.populate_descriptors.return_value = DescriptorBatchReport(
descriptors_generated=12,
tiles_consumed=12,
oom_retries=0,
elapsed_s=1.2,
outcome=BatcherOutcome.SUCCESS,
failure_reason=None,
)
descriptor_index_factory = MagicMock()
descriptor_index_factory.return_value = MagicMock(
spec=["mmap_handle", "descriptor_dim"]
)
return _DriverHarness(
cache_root=cache_root,
tile_store_path=tile_store_path,
faiss_index_path=faiss_index_path,
sha256_path=sha256_path,
meta_path=meta_path,
route_spec=route_spec,
route_client=route_client,
tile_downloader=tile_downloader,
descriptor_batcher=descriptor_batcher,
descriptor_index_factory=descriptor_index_factory,
sleep_calls=[],
)
def _drive(harness: _DriverHarness, **overrides: object) -> PopulatedC6Cache:
"""Invoke the driver with the harness defaults plus any overrides."""
kwargs: dict[str, object] = {
"route_spec": harness.route_spec,
"route_client": harness.route_client,
"tile_downloader": harness.tile_downloader,
"descriptor_batcher": harness.descriptor_batcher,
"descriptor_index_factory": harness.descriptor_index_factory,
"cache_root": harness.cache_root,
"tile_store_path": harness.tile_store_path,
"faiss_index_path": harness.faiss_index_path,
"sleep": harness.sleep_calls.append,
}
kwargs.update(overrides)
return populate_c6_from_route(**kwargs) # type: ignore[arg-type]
# ----------------------------------------------------------------------
# AC-3 — happy path
def test_populate_c6_from_route_returns_populated_cache(tmp_path: Path) -> None:
# Arrange
harness = _build_harness(tmp_path)
# Act
populated = _drive(harness)
# Assert
assert isinstance(populated, PopulatedC6Cache)
assert populated.cache_root == harness.cache_root
assert populated.tile_store_path == harness.tile_store_path
assert populated.faiss_index_path == harness.faiss_index_path
assert populated.faiss_sidecar_sha256_path == harness.sha256_path
assert populated.faiss_sidecar_meta_path == harness.meta_path
assert populated.route_spec is harness.route_spec
assert populated.tile_count == 12
assert populated.elapsed_seconds >= 0.0
harness.route_client.seed_route.assert_called_once()
harness.tile_downloader.download_tiles_for_area.assert_called_once()
harness.descriptor_batcher.populate_descriptors.assert_called_once()
harness.descriptor_index_factory.assert_called_once()
def test_populate_c6_from_route_passes_sector_class_to_downloader(
tmp_path: Path,
) -> None:
# Arrange
harness = _build_harness(tmp_path)
# Act
_drive(harness, sector_class=SectorClassification.STABLE_REAR)
# Assert
download_request = harness.tile_downloader.download_tiles_for_area.call_args.args[0]
assert download_request.sector_class is SectorClassification.STABLE_REAR
corpus_filter = harness.descriptor_batcher.populate_descriptors.call_args.args[0]
assert corpus_filter.sector_class == SectorClassification.STABLE_REAR.value
# ----------------------------------------------------------------------
# AC-4 — validation / terminal failure propagate unchanged
def test_route_validation_error_propagates_unchanged(tmp_path: Path) -> None:
# Arrange
harness = _build_harness(tmp_path)
def _raise_validation(*_args: object, **_kwargs: object) -> RouteSeedResult:
try:
raise ValueError("payload sha256 mismatch")
except ValueError as cause:
raise RouteValidationError("payload rejected") from cause
harness.route_client.seed_route.side_effect = _raise_validation
# Act + Assert
with pytest.raises(RouteValidationError) as exc_info:
_drive(harness)
assert isinstance(exc_info.value.__cause__, ValueError)
assert "payload sha256 mismatch" in str(exc_info.value.__cause__)
assert harness.tile_downloader.download_tiles_for_area.call_count == 0
assert harness.descriptor_batcher.populate_descriptors.call_count == 0
assert harness.sleep_calls == []
def test_route_terminal_failure_propagates_unchanged(tmp_path: Path) -> None:
# Arrange
harness = _build_harness(tmp_path)
harness.route_client.seed_route.side_effect = RouteTerminalFailureError(
"mapsReady never reached"
)
# Act + Assert
with pytest.raises(RouteTerminalFailureError):
_drive(harness)
assert harness.tile_downloader.download_tiles_for_area.call_count == 0
assert harness.descriptor_batcher.populate_descriptors.call_count == 0
assert harness.sleep_calls == []
# ----------------------------------------------------------------------
# AC-5 — transient retry budget
def test_route_transient_error_retries_then_succeeds(tmp_path: Path) -> None:
# Arrange
harness = _build_harness(tmp_path)
success_result = harness.route_client.seed_route.return_value
harness.route_client.seed_route.side_effect = [
RouteTransientError("503 first attempt"),
RouteTransientError("503 second attempt"),
success_result,
]
# Act
populated = _drive(
harness,
retry_schedule_s=(0.1, 0.2, 0.4),
max_retry_attempts=3,
)
# Assert
assert populated.tile_count == 12
assert harness.route_client.seed_route.call_count == 3
assert harness.sleep_calls == [pytest.approx(0.1), pytest.approx(0.2)]
def test_route_transient_error_exhausted_propagates_last_attempt(
tmp_path: Path,
) -> None:
# Arrange
harness = _build_harness(tmp_path)
final_exc = RouteTransientError("503 final attempt")
harness.route_client.seed_route.side_effect = [
RouteTransientError("503 a"),
RouteTransientError("503 b"),
final_exc,
]
# Act + Assert
with pytest.raises(RouteTransientError) as exc_info:
_drive(
harness,
retry_schedule_s=(0.1, 0.2),
max_retry_attempts=3,
)
assert exc_info.value is final_exc
assert harness.route_client.seed_route.call_count == 3
assert harness.sleep_calls == [pytest.approx(0.1), pytest.approx(0.2)]
assert harness.tile_downloader.download_tiles_for_area.call_count == 0
# ----------------------------------------------------------------------
# AC-6 — tamper between rebuild and verify
def test_descriptor_index_factory_index_unavailable_propagates(
tmp_path: Path,
) -> None:
# Arrange
harness = _build_harness(tmp_path)
# Simulate the rebuild writing sidecar files DURING populate_descriptors
# (the real C10 batcher does this via its DescriptorIndexRebuilder cut).
_stub_populate_descriptors_writes_sidecars(harness)
harness.descriptor_index_factory.side_effect = IndexUnavailableError(
"sidecar sha256 mismatch — index is corrupt"
)
# Act + Assert
with pytest.raises(IndexUnavailableError):
_drive(harness)
# ----------------------------------------------------------------------
# AC-7 — cleanup on failure
def test_cleanup_removes_partial_sidecar_files_on_failure(
tmp_path: Path,
) -> None:
# Arrange
harness = _build_harness(tmp_path)
# The driver MUST observe an absent-sidecar state on entry, then a
# rebuild that writes the trio, then a verifier that fails — only
# then is the cleanup contract exercised on a "we created these"
# set of paths.
assert not harness.faiss_index_path.exists()
_stub_populate_descriptors_writes_sidecars(harness)
harness.descriptor_index_factory.side_effect = IndexUnavailableError(
"tamper detected"
)
# Act
with pytest.raises(IndexUnavailableError):
_drive(harness)
# Assert
assert not harness.faiss_index_path.exists()
assert not harness.sha256_path.exists()
assert not harness.meta_path.exists()
def test_cleanup_preserves_pre_existing_warm_cache(tmp_path: Path) -> None:
# Arrange
harness = _build_harness(tmp_path)
# A warm cache existed before the driver ran (named-volume reuse path).
_write_dummy_sidecars(harness, marker="WARM_CACHE")
harness.route_client.seed_route.side_effect = RouteValidationError(
"noop fail post-warm-cache"
)
# Act
with pytest.raises(RouteValidationError):
_drive(harness)
# Assert — the pre-existing warm-cache files MUST stay on disk.
assert harness.faiss_index_path.read_text() == "WARM_CACHE"
assert harness.sha256_path.read_text() == "WARM_CACHE"
assert harness.meta_path.read_text() == "WARM_CACHE"
def test_batcher_failure_propagates_and_cleans_up(tmp_path: Path) -> None:
# Arrange
harness = _build_harness(tmp_path)
def _populate_writes_partial_sidecar_then_fails(
_filter: object,
) -> DescriptorBatchReport:
_write_dummy_sidecars(harness, marker="HALF_BUILT")
return DescriptorBatchReport(
descriptors_generated=0,
tiles_consumed=0,
oom_retries=0,
elapsed_s=0.5,
outcome=BatcherOutcome.FAILURE,
failure_reason="OOM at batch_size=64",
)
harness.descriptor_batcher.populate_descriptors.side_effect = (
_populate_writes_partial_sidecar_then_fails
)
# Act + Assert
with pytest.raises(RuntimeError) as exc_info:
_drive(harness)
assert "OOM at batch_size=64" in str(exc_info.value)
assert not harness.faiss_index_path.exists()
assert not harness.sha256_path.exists()
assert not harness.meta_path.exists()
def test_downloader_failure_propagates_and_cleans_up(tmp_path: Path) -> None:
# Arrange
harness = _build_harness(tmp_path)
harness.tile_downloader.download_tiles_for_area.return_value = (
DownloadBatchReport(
outcome=DownloadOutcome.FAILURE,
tiles_requested=12,
tiles_downloaded=0,
tiles_rejected_resolution=0,
tiles_rejected_freshness=0,
tiles_downgraded=0,
retry_count=2,
request_hash="abcdef0123456789",
)
)
# Act + Assert
with pytest.raises(RuntimeError) as exc_info:
_drive(harness)
assert "failure" in str(exc_info.value).lower()
assert harness.descriptor_batcher.populate_descriptors.call_count == 0
# ----------------------------------------------------------------------
# Internal helpers
def _write_dummy_sidecars(
harness: _DriverHarness,
*,
marker: str = "PARTIAL",
) -> None:
"""Create the three sidecar files at the harness's faiss path."""
harness.faiss_index_path.write_text(marker)
harness.sha256_path.write_text(marker)
harness.meta_path.write_text(marker)
def _stub_populate_descriptors_writes_sidecars(
harness: _DriverHarness,
*,
marker: str = "FRESH_REBUILD",
) -> None:
"""Make the stubbed batcher write the three sidecar files on success.
The real C10 batcher writes the FAISS index + sha256 + meta.json
via the AZ-306 :class:`FaissDescriptorIndex.rebuild_from_descriptors`
path. The stub mirrors that side effect so the AC-7 cleanup path
has files to rollback on a downstream verifier failure.
"""
success_report = harness.descriptor_batcher.populate_descriptors.return_value
def _populate(_filter: object) -> DescriptorBatchReport:
_write_dummy_sidecars(harness, marker=marker)
return success_report
harness.descriptor_batcher.populate_descriptors.side_effect = _populate