Files
Oleksandr Bezdieniezhnykh 702a0c0ff3 [AZ-408] [AZ-410] [AZ-411] Batch 69: synth injectors + FT-P-02/03/14
AZ-408 (3pt) — Replace AZ-406 injector scaffolds with concrete generators:
- outlier.py: deterministic stride + far-away tile replacement; AC-2 ≥350m offset
- blackout_spoof.py: paired video blackout + FC GPS spoof with ≤40ms alignment;
  AC-4 realistic fix_type/hdop; AC-NEW-8 200-500m inter-spoof deltas
- multi_segment.py: ≥3 disjoint windows, ≥30s gaps, ≤25% coverage
- fc_proxy.py: timed-splice runtime proxy with pre-activate RuntimeError guard
- _common.py: derive_rng + tile-manifest reader + tmpfs helpers
- injector_fixtures.py: pytest fixtures wired via runner conftest

AZ-410 (3pt) — FT-P-02 cumulative drift between satellite anchors:
- anchor_pair_detector.py: AC-1 detection, AC-2/3 pass-fraction,
  AC-4 monotonicity check, CSV evidence
- test_ft_p_02_derkachi_drift.py: scenario gated on upstream helper
  NotImplementedError (frame_source_replay / fdr_reader / imu_replay)

AZ-411 (2pt) — FT-P-03 + FT-P-14 schema + WGS84:
- estimate_schema.py: AC-1 schema completeness, AC-2 source-label set
  containment, AC-3 WGS84 range + int32 1e-7 decode
- test_ft_p_03_14_schema_wgs84.py: shared single-image-push scenario

Tests: 248 unit tests pass (+91 vs batch 68).
Reports: batch_69_report.md, batch_69_review.md (PASS),
cumulative_review_batches_67-69_cycle1_report.md (PASS).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 17:54:00 +03:00

222 lines
8.2 KiB
Python

"""Shared helpers for the AZ-408 runtime synthetic-injection fixture builders.
Three responsibilities, each kept deliberately small:
1. **Deterministic seed derivation** — every injector accepts an integer
``--seed`` flag and must produce bit-identical output across two runs
for the same ``(seed, density|window_seconds|n_segments)`` pair. The
shared ``derive_rng()`` helper hashes the inputs into a 64-bit seed,
so two unrelated injectors don't accidentally share a stream.
2. **Tile-cache manifest read** — the outlier injector needs to pick a
"far-away" tile (per AC-3.1: ≥350 m offset). The tile-cache fixture
(built by AZ-407 / ``e2e/fixtures/tile-cache-builder/builder.py``)
ships a ``manifest.csv`` with the per-tile ground-truth lat/lon
derivable from ``(zoom_level, tile_x, tile_y)`` via the slippy-map
convention. We read the CSV ourselves rather than depending on the
builder package — that keeps the injectors independently testable
without a Docker tile-cache volume present.
3. **Tmpfs scratch root** — AC-6 says "auto-cleared at teardown within
≤2 s". We expose ``tmpfs_root(run_id, scenario)`` so every injector
writes under the same predictable parent (``/tmp/<run_id>/<scenario>/``)
and the pytest fixture wrapper can shutil.rmtree on teardown.
Public-boundary discipline: this module does NOT import any
``src/gps_denied_onboard`` symbol.
"""
from __future__ import annotations
import csv
import hashlib
import math
import shutil
import struct
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
import numpy as np
DEFAULT_SCRATCH_ROOT = Path("/tmp")
def derive_rng(domain: str, *components: object) -> np.random.Generator:
"""Stable RNG keyed on ``(domain, components...)``.
The domain string is a short unique tag per injector (``"outlier"``,
``"blackout_spoof"``, ``"multi_segment"``); the components are the
user-visible knobs (seed, density, window_seconds, etc.).
Two invocations with the same arguments return RNGs that produce the
same sequence of values. Two invocations with different ``domain`` —
even with the same ``components`` — produce independent sequences.
"""
payload = "|".join((domain,) + tuple(str(c) for c in components))
digest = hashlib.sha256(payload.encode("ascii")).digest()
seed64 = struct.unpack(">Q", digest[:8])[0]
return np.random.default_rng(seed64)
def tmpfs_root(run_id: str, scenario: str, base: Path | None = None) -> Path:
"""Return ``<base>/<run_id>/<scenario>/`` (created); used by every injector.
The pytest fixture wrapper passes ``base = pytest's tmp_path_factory``
so unit-test runs stay inside the pytest tmp tree rather than ``/tmp``.
"""
base = base or DEFAULT_SCRATCH_ROOT
out = base / run_id / scenario
out.mkdir(parents=True, exist_ok=True)
return out
def cleanup_tmpfs(path: Path) -> None:
"""``rmtree`` ``path`` if it exists; silent no-op otherwise.
Called from pytest fixture teardown. Per AC-6 the rm must complete
within ≤2 s; ``shutil.rmtree`` of a single-scenario directory with a
few thousand small files reliably finishes in <100 ms.
"""
if path.exists():
shutil.rmtree(path)
# ---------------------------------------------------------------------------
# Tile-cache manifest read (AZ-407 schema)
# ---------------------------------------------------------------------------
# Slippy-map convention — see e2e/fixtures/tile-cache-builder/builder.py
# DEFAULT_ZOOM = 18 — these constants are the contract this module relies
# on (they are NOT imported from the builder to avoid a runtime dependency
# on the tile-cache-builder package at injector-test time).
_TILE_SIZE = 256 # px
@dataclass(frozen=True)
class TileGtRow:
"""One row of the tile-cache manifest, with derived lat/lon centre."""
zoom_level: int
tile_x: int
tile_y: int
capture_date: str
source: str
m_per_px: float
jpeg_path: str
content_hash: str
provenance: str
centre_lat_deg: float
centre_lon_deg: float
def _tile_centre_lat_lon(zoom: int, tx: int, ty: int) -> tuple[float, float]:
"""Slippy XYZ tile centre → (lat_deg, lon_deg).
Standard Web-Mercator inverse of the (tx, ty) tile origin offset by
``+0.5`` to get the centre rather than the NW corner.
"""
n = 2.0 ** zoom
lon_deg = (tx + 0.5) / n * 360.0 - 180.0
lat_rad = math.atan(math.sinh(math.pi * (1 - 2 * (ty + 0.5) / n)))
lat_deg = math.degrees(lat_rad)
return lat_deg, lon_deg
def read_tile_manifest(manifest_csv: Path) -> list[TileGtRow]:
"""Parse the tile-cache ``manifest.csv`` (AZ-407 schema) into typed rows.
Each row gets a derived ``(centre_lat_deg, centre_lon_deg)`` computed
from the slippy tile coordinates — the injectors use this for the
"far-away crop" geodesic check (AC-2).
Raises FileNotFoundError when the manifest is missing — the injector
CLI surfaces this with an explicit "build the tile-cache fixture
first" message. We do NOT silently fall back to a stub manifest;
that would hide a misconfigured test run.
"""
if not manifest_csv.is_file():
raise FileNotFoundError(
f"tile-cache manifest not found at {manifest_csv} — build the "
"tile-cache fixture first (`./e2e/fixtures/tile-cache-builder/build.sh`)"
)
rows: list[TileGtRow] = []
with manifest_csv.open("r", newline="") as fp:
reader = csv.DictReader(fp)
for raw in reader:
zoom = int(raw["zoom_level"])
tx = int(raw["tile_x"])
ty = int(raw["tile_y"])
lat, lon = _tile_centre_lat_lon(zoom, tx, ty)
rows.append(
TileGtRow(
zoom_level=zoom,
tile_x=tx,
tile_y=ty,
capture_date=raw["capture_date"],
source=raw["source"],
m_per_px=float(raw["m_per_px"]),
jpeg_path=raw["jpeg_path"],
content_hash=raw["content_hash"],
provenance=raw["provenance"],
centre_lat_deg=lat,
centre_lon_deg=lon,
)
)
if not rows:
raise ValueError(f"tile-cache manifest at {manifest_csv} is empty")
return rows
def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Great-circle distance in meters (Haversine).
Used by the injector "far-away" check. We deliberately re-implement
rather than importing ``runner.helpers.geo.distance_m`` — the
injectors must work without pyproj installed (the project's
``[dev]`` extra installs pyproj, but the injectors run inside
minimal Docker images and on bare ground stations).
"""
R = 6_371_000.0
p1 = math.radians(lat1)
p2 = math.radians(lat2)
dp = math.radians(lat2 - lat1)
dl = math.radians(lon2 - lon1)
a = math.sin(dp / 2) ** 2 + math.cos(p1) * math.cos(p2) * math.sin(dl / 2) ** 2
return float(2 * R * math.asin(math.sqrt(a)))
def far_away_indices(
rows: list[TileGtRow],
src_idx: int,
min_offset_m: float,
) -> list[int]:
"""Return indices of rows whose centre is ≥ ``min_offset_m`` from ``src_idx``."""
src = rows[src_idx]
return [
j
for j, r in enumerate(rows)
if j != src_idx
and haversine_m(src.centre_lat_deg, src.centre_lon_deg, r.centre_lat_deg, r.centre_lon_deg)
>= min_offset_m
]
# ---------------------------------------------------------------------------
# Tiny utilities
# ---------------------------------------------------------------------------
def iter_video_frame_indices(total_frames: int, density_ratio: float) -> Iterable[int]:
"""Yield 1-of-N frame indices for the requested density ratio.
Density is the fraction of frames replaced; e.g., ``density_ratio=0.1``
means every 10th frame (deterministic stride, NOT random sampling) —
we keep the stride deterministic so the unit test's "X-th frame is
replaced" assertion stays stable.
"""
if not 0 < density_ratio <= 1.0:
raise ValueError(f"density_ratio must be in (0, 1]; got {density_ratio}")
stride = max(1, round(1 / density_ratio))
return range(0, total_frames, stride)