Files
Oleksandr Bezdieniezhnykh 702a0c0ff3 [AZ-408] [AZ-410] [AZ-411] Batch 69: synth injectors + FT-P-02/03/14
AZ-408 (3pt) — Replace AZ-406 injector scaffolds with concrete generators:
- outlier.py: deterministic stride + far-away tile replacement; AC-2 ≥350m offset
- blackout_spoof.py: paired video blackout + FC GPS spoof with ≤40ms alignment;
  AC-4 realistic fix_type/hdop; AC-NEW-8 200-500m inter-spoof deltas
- multi_segment.py: ≥3 disjoint windows, ≥30s gaps, ≤25% coverage
- fc_proxy.py: timed-splice runtime proxy with pre-activate RuntimeError guard
- _common.py: derive_rng + tile-manifest reader + tmpfs helpers
- injector_fixtures.py: pytest fixtures wired via runner conftest

AZ-410 (3pt) — FT-P-02 cumulative drift between satellite anchors:
- anchor_pair_detector.py: AC-1 detection, AC-2/3 pass-fraction,
  AC-4 monotonicity check, CSV evidence
- test_ft_p_02_derkachi_drift.py: scenario gated on upstream helper
  NotImplementedError (frame_source_replay / fdr_reader / imu_replay)

AZ-411 (2pt) — FT-P-03 + FT-P-14 schema + WGS84:
- estimate_schema.py: AC-1 schema completeness, AC-2 source-label set
  containment, AC-3 WGS84 range + int32 1e-7 decode
- test_ft_p_03_14_schema_wgs84.py: shared single-image-push scenario

Tests: 248 unit tests pass (+91 vs batch 68).
Reports: batch_69_report.md, batch_69_review.md (PASS),
cumulative_review_batches_67-69_cycle1_report.md (PASS).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 17:54:00 +03:00

311 lines
11 KiB
Python

"""outlier-injection-derkachi — overlay far-away tile crops onto Derkachi frames (FT-N-01).
Produces a per-test tmpfs fixture whose ``frames/`` subdirectory mirrors
the source Derkachi frames byte-for-byte EXCEPT that selected frames are
replaced with a JPEG crop pulled from a tile whose centre is ≥350 m
(AC-3.1) from the original frame's GT centre. The companion
``manifest.csv`` records, per replaced frame, ``(frame_idx, src_jpeg_path,
replacement_tile_x, replacement_tile_y, geodesic_offset_m, seed)`` so the
downstream FT-N-01 / FT-P-08 / NFT-RES-04 tests can assert AC-3.1 directly
without re-deriving the geo math.
Density flags ≈ AZ-408 AC-1 / AC-2:
* ``light`` → 1 in 100 frames (replacement ratio 0.01)
* ``medium`` → 1 in 10 frames (replacement ratio 0.10)
* ``heavy`` → 1 in 3 frames (replacement ratio ≈ 0.333)
Determinism (AC-1):
* The frame indices replaced are computed by a deterministic stride
(``_common.iter_video_frame_indices``) — not by random sampling — so two
runs replace the *same* frames.
* The replacement tile for each replaced frame is picked from a
``_common.derive_rng("outlier", seed, density)`` stream — same seed →
same picks.
* Output filenames mirror the source filenames; JPEG bodies are re-encoded
through a pinned PIL pipeline (``quality=85, optimize=False,
progressive=False, subsampling=2``) so the bytes are stable.
Tmpfs (AC-6): the injector writes only under the directory ``out_root``
passes in; the pytest fixture wrapper takes care of teardown.
Public-boundary discipline: this module does NOT import any
``src/gps_denied_onboard`` symbol.
"""
from __future__ import annotations
import argparse
import csv
import io
import json
import logging
import shutil
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Literal
from ._common import (
derive_rng,
far_away_indices,
haversine_m,
iter_video_frame_indices,
read_tile_manifest,
tmpfs_root,
)
logger = logging.getLogger(__name__)
Density = Literal["light", "medium", "heavy"]
_DENSITY_RATIO: dict[Density, float] = {
"light": 1 / 100,
"medium": 1 / 10,
"heavy": 1 / 3,
}
_TILE_W = 256
_TILE_H = 256
@dataclass(frozen=True)
class OutlierInjectionPlan:
"""Configuration for the outlier-injection-derkachi fixture.
AZ-408 replaces the AZ-406 scaffold dataclass; the previous shape
(``target_segment_seconds`` / ``max_offset_m`` / ``n_outliers``) was
a placeholder and is no longer used by any test.
"""
source_frames_dir: Path
tile_cache_dir: Path
density: Density
seed: int = 0
min_offset_m: float = 350.0
@dataclass(frozen=True)
class OutlierInjectionReport:
"""Summary of a single ``build()`` run — written to ``manifest.csv``."""
out_root: Path
total_source_frames: int
replaced_frame_count: int
density: Density
min_geodesic_offset_m: float
max_geodesic_offset_m: float
def _gt_centre_for_frame(
frame_idx: int,
tiles: list,
) -> tuple[float, float, int]:
"""Map a source frame to a (lat, lon, src_tile_idx) triple.
For the Derkachi fixture each AD-frame has a paired tile entry in
the tile-cache manifest (`paired_gmaps:ADNNNNNN` in the
`provenance` column). For unpaired frames we fall back to the
bbox tile (`STUB_BBOX:derkachi:*`); if even that's missing we
fall back to the first tile so the injector still runs.
"""
for j, r in enumerate(tiles):
if r.provenance.startswith("paired_gmaps:") and r.provenance.endswith(
f"AD{frame_idx + 1:06d}"
):
return r.centre_lat_deg, r.centre_lon_deg, j
for j, r in enumerate(tiles):
if r.provenance.startswith("STUB_BBOX:"):
return r.centre_lat_deg, r.centre_lon_deg, j
return tiles[0].centre_lat_deg, tiles[0].centre_lon_deg, 0
def _read_replacement_jpeg(tile_cache_dir: Path, jpeg_path: str) -> bytes:
"""Read + re-encode a tile JPEG through PIL with pinned settings.
Re-encoding (rather than raw copy) guarantees the body matches the
builder's encode (PIL ``quality=85, optimize=False, progressive=False,
subsampling=2``) even if the tile was written by a foreign tool.
"""
from PIL import Image # noqa: PLC0415 — heavy import, deferred
src = tile_cache_dir / jpeg_path
img = Image.open(src).convert("RGB").resize((_TILE_W, _TILE_H), Image.BICUBIC)
buf = io.BytesIO()
img.save(
buf,
format="JPEG",
quality=85,
optimize=False,
progressive=False,
subsampling=2,
)
return buf.getvalue()
def build(plan: OutlierInjectionPlan, out_root: Path) -> OutlierInjectionReport:
"""Generate the outlier-injection-derkachi fixture under ``out_root``.
Returns an ``OutlierInjectionReport`` summarising the run. Writes:
<out_root>/
frames/AD000001.jpg # passthrough or replaced
frames/AD000002.jpg # …
manifest.csv # per-replaced-frame metadata
summary.json # report fields, machine-readable
"""
if out_root.exists():
shutil.rmtree(out_root)
(out_root / "frames").mkdir(parents=True)
src_dir = plan.source_frames_dir
if not src_dir.is_dir():
raise FileNotFoundError(f"source frames directory not found: {src_dir}")
frames = sorted(src_dir.glob("AD*.jpg"))
if not frames:
raise FileNotFoundError(f"no AD*.jpg frames under {src_dir}")
tiles = read_tile_manifest(plan.tile_cache_dir / "manifest.csv")
ratio = _DENSITY_RATIO[plan.density]
replace_indices = set(iter_video_frame_indices(len(frames), ratio))
rng = derive_rng("outlier", plan.seed, plan.density)
manifest_rows: list[dict] = []
geodesic_offsets: list[float] = []
for frame_idx, frame_path in enumerate(frames):
out_path = out_root / "frames" / frame_path.name
if frame_idx not in replace_indices:
shutil.copy2(frame_path, out_path)
continue
src_lat, src_lon, src_tile_idx = _gt_centre_for_frame(frame_idx, tiles)
candidates = far_away_indices(tiles, src_tile_idx, plan.min_offset_m)
if not candidates:
raise RuntimeError(
f"no tile in {plan.tile_cache_dir} is ≥{plan.min_offset_m} m "
f"from frame {frame_path.name} — tile cache too small for "
"outlier injection"
)
pick_idx = int(rng.integers(0, len(candidates)))
chosen = tiles[candidates[pick_idx]]
offset_m = haversine_m(
src_lat, src_lon, chosen.centre_lat_deg, chosen.centre_lon_deg
)
geodesic_offsets.append(offset_m)
jpeg = _read_replacement_jpeg(plan.tile_cache_dir, chosen.jpeg_path)
out_path.write_bytes(jpeg)
manifest_rows.append(
{
"frame_idx": frame_idx,
"src_jpeg_path": str(frame_path.name),
"replacement_tile_x": chosen.tile_x,
"replacement_tile_y": chosen.tile_y,
"replacement_zoom": chosen.zoom_level,
"geodesic_offset_m": f"{offset_m:.3f}",
"density": plan.density,
"seed": plan.seed,
}
)
_write_manifest(out_root, manifest_rows)
report = OutlierInjectionReport(
out_root=out_root,
total_source_frames=len(frames),
replaced_frame_count=len(manifest_rows),
density=plan.density,
min_geodesic_offset_m=min(geodesic_offsets) if geodesic_offsets else 0.0,
max_geodesic_offset_m=max(geodesic_offsets) if geodesic_offsets else 0.0,
)
_write_summary(out_root, report)
return report
def _write_manifest(out_root: Path, rows: list[dict]) -> None:
manifest = out_root / "manifest.csv"
with manifest.open("w", newline="") as fp:
writer = csv.DictWriter(
fp,
fieldnames=[
"frame_idx",
"src_jpeg_path",
"replacement_tile_x",
"replacement_tile_y",
"replacement_zoom",
"geodesic_offset_m",
"density",
"seed",
],
lineterminator="\n",
)
writer.writeheader()
for row in sorted(rows, key=lambda r: r["frame_idx"]):
writer.writerow(row)
def _write_summary(out_root: Path, report: OutlierInjectionReport) -> None:
payload = {
"scenario": "outlier-injection-derkachi",
"total_source_frames": report.total_source_frames,
"replaced_frame_count": report.replaced_frame_count,
"density": report.density,
"min_geodesic_offset_m": round(report.min_geodesic_offset_m, 3),
"max_geodesic_offset_m": round(report.max_geodesic_offset_m, 3),
}
(out_root / "summary.json").write_text(
json.dumps(payload, sort_keys=True, indent=2) + "\n"
)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Outlier injection (FT-N-01)")
parser.add_argument("--source-frames", type=Path, required=True)
parser.add_argument("--tile-cache", type=Path, required=True)
parser.add_argument("--density", choices=("light", "medium", "heavy"), required=True)
parser.add_argument("--seed", type=int, default=0)
parser.add_argument("--min-offset-m", type=float, default=350.0)
parser.add_argument(
"--out-root",
type=Path,
default=None,
help="Output dir. If omitted, /tmp/<run_id>/outlier-<density>/.",
)
parser.add_argument("--run-id", default="local")
parser.add_argument("--quiet", action="store_true")
args = parser.parse_args(argv)
logging.basicConfig(
level=logging.WARNING if args.quiet else logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
out_root = args.out_root or tmpfs_root(args.run_id, f"outlier-{args.density}")
plan = OutlierInjectionPlan(
source_frames_dir=args.source_frames,
tile_cache_dir=args.tile_cache,
density=args.density,
seed=args.seed,
min_offset_m=args.min_offset_m,
)
report = build(plan, out_root)
summary = {
"scenario": "outlier-injection-derkachi",
"out_root": str(report.out_root),
"total_source_frames": report.total_source_frames,
"replaced_frame_count": report.replaced_frame_count,
"density": report.density,
"min_geodesic_offset_m": round(report.min_geodesic_offset_m, 3),
"max_geodesic_offset_m": round(report.max_geodesic_offset_m, 3),
}
json.dump(summary, sys.stdout, sort_keys=True, indent=2)
sys.stdout.write("\n")
return 0
if __name__ == "__main__":
raise SystemExit(main())