mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 09:51:13 +00:00
702a0c0ff3
AZ-408 (3pt) — Replace AZ-406 injector scaffolds with concrete generators: - outlier.py: deterministic stride + far-away tile replacement; AC-2 ≥350m offset - blackout_spoof.py: paired video blackout + FC GPS spoof with ≤40ms alignment; AC-4 realistic fix_type/hdop; AC-NEW-8 200-500m inter-spoof deltas - multi_segment.py: ≥3 disjoint windows, ≥30s gaps, ≤25% coverage - fc_proxy.py: timed-splice runtime proxy with pre-activate RuntimeError guard - _common.py: derive_rng + tile-manifest reader + tmpfs helpers - injector_fixtures.py: pytest fixtures wired via runner conftest AZ-410 (3pt) — FT-P-02 cumulative drift between satellite anchors: - anchor_pair_detector.py: AC-1 detection, AC-2/3 pass-fraction, AC-4 monotonicity check, CSV evidence - test_ft_p_02_derkachi_drift.py: scenario gated on upstream helper NotImplementedError (frame_source_replay / fdr_reader / imu_replay) AZ-411 (2pt) — FT-P-03 + FT-P-14 schema + WGS84: - estimate_schema.py: AC-1 schema completeness, AC-2 source-label set containment, AC-3 WGS84 range + int32 1e-7 decode - test_ft_p_03_14_schema_wgs84.py: shared single-image-push scenario Tests: 248 unit tests pass (+91 vs batch 68). Reports: batch_69_report.md, batch_69_review.md (PASS), cumulative_review_batches_67-69_cycle1_report.md (PASS). Co-authored-by: Cursor <cursoragent@cursor.com>
311 lines
11 KiB
Python
311 lines
11 KiB
Python
"""outlier-injection-derkachi — overlay far-away tile crops onto Derkachi frames (FT-N-01).
|
|
|
|
Produces a per-test tmpfs fixture whose ``frames/`` subdirectory mirrors
|
|
the source Derkachi frames byte-for-byte EXCEPT that selected frames are
|
|
replaced with a JPEG crop pulled from a tile whose centre is ≥350 m
|
|
(AC-3.1) from the original frame's GT centre. The companion
|
|
``manifest.csv`` records, per replaced frame, ``(frame_idx, src_jpeg_path,
|
|
replacement_tile_x, replacement_tile_y, geodesic_offset_m, seed)`` so the
|
|
downstream FT-N-01 / FT-P-08 / NFT-RES-04 tests can assert AC-3.1 directly
|
|
without re-deriving the geo math.
|
|
|
|
Density flags ≈ AZ-408 AC-1 / AC-2:
|
|
|
|
* ``light`` → 1 in 100 frames (replacement ratio 0.01)
|
|
* ``medium`` → 1 in 10 frames (replacement ratio 0.10)
|
|
* ``heavy`` → 1 in 3 frames (replacement ratio ≈ 0.333)
|
|
|
|
Determinism (AC-1):
|
|
|
|
* The frame indices replaced are computed by a deterministic stride
|
|
(``_common.iter_video_frame_indices``) — not by random sampling — so two
|
|
runs replace the *same* frames.
|
|
* The replacement tile for each replaced frame is picked from a
|
|
``_common.derive_rng("outlier", seed, density)`` stream — same seed →
|
|
same picks.
|
|
* Output filenames mirror the source filenames; JPEG bodies are re-encoded
|
|
through a pinned PIL pipeline (``quality=85, optimize=False,
|
|
progressive=False, subsampling=2``) so the bytes are stable.
|
|
|
|
Tmpfs (AC-6): the injector writes only under the directory ``out_root``
|
|
passes in; the pytest fixture wrapper takes care of teardown.
|
|
|
|
Public-boundary discipline: this module does NOT import any
|
|
``src/gps_denied_onboard`` symbol.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import io
|
|
import json
|
|
import logging
|
|
import shutil
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Literal
|
|
|
|
from ._common import (
|
|
derive_rng,
|
|
far_away_indices,
|
|
haversine_m,
|
|
iter_video_frame_indices,
|
|
read_tile_manifest,
|
|
tmpfs_root,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
Density = Literal["light", "medium", "heavy"]
|
|
|
|
_DENSITY_RATIO: dict[Density, float] = {
|
|
"light": 1 / 100,
|
|
"medium": 1 / 10,
|
|
"heavy": 1 / 3,
|
|
}
|
|
|
|
_TILE_W = 256
|
|
_TILE_H = 256
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OutlierInjectionPlan:
|
|
"""Configuration for the outlier-injection-derkachi fixture.
|
|
|
|
AZ-408 replaces the AZ-406 scaffold dataclass; the previous shape
|
|
(``target_segment_seconds`` / ``max_offset_m`` / ``n_outliers``) was
|
|
a placeholder and is no longer used by any test.
|
|
"""
|
|
|
|
source_frames_dir: Path
|
|
tile_cache_dir: Path
|
|
density: Density
|
|
seed: int = 0
|
|
min_offset_m: float = 350.0
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OutlierInjectionReport:
|
|
"""Summary of a single ``build()`` run — written to ``manifest.csv``."""
|
|
|
|
out_root: Path
|
|
total_source_frames: int
|
|
replaced_frame_count: int
|
|
density: Density
|
|
min_geodesic_offset_m: float
|
|
max_geodesic_offset_m: float
|
|
|
|
|
|
def _gt_centre_for_frame(
|
|
frame_idx: int,
|
|
tiles: list,
|
|
) -> tuple[float, float, int]:
|
|
"""Map a source frame to a (lat, lon, src_tile_idx) triple.
|
|
|
|
For the Derkachi fixture each AD-frame has a paired tile entry in
|
|
the tile-cache manifest (`paired_gmaps:ADNNNNNN` in the
|
|
`provenance` column). For unpaired frames we fall back to the
|
|
bbox tile (`STUB_BBOX:derkachi:*`); if even that's missing we
|
|
fall back to the first tile so the injector still runs.
|
|
"""
|
|
for j, r in enumerate(tiles):
|
|
if r.provenance.startswith("paired_gmaps:") and r.provenance.endswith(
|
|
f"AD{frame_idx + 1:06d}"
|
|
):
|
|
return r.centre_lat_deg, r.centre_lon_deg, j
|
|
for j, r in enumerate(tiles):
|
|
if r.provenance.startswith("STUB_BBOX:"):
|
|
return r.centre_lat_deg, r.centre_lon_deg, j
|
|
return tiles[0].centre_lat_deg, tiles[0].centre_lon_deg, 0
|
|
|
|
|
|
def _read_replacement_jpeg(tile_cache_dir: Path, jpeg_path: str) -> bytes:
|
|
"""Read + re-encode a tile JPEG through PIL with pinned settings.
|
|
|
|
Re-encoding (rather than raw copy) guarantees the body matches the
|
|
builder's encode (PIL ``quality=85, optimize=False, progressive=False,
|
|
subsampling=2``) even if the tile was written by a foreign tool.
|
|
"""
|
|
from PIL import Image # noqa: PLC0415 — heavy import, deferred
|
|
|
|
src = tile_cache_dir / jpeg_path
|
|
img = Image.open(src).convert("RGB").resize((_TILE_W, _TILE_H), Image.BICUBIC)
|
|
buf = io.BytesIO()
|
|
img.save(
|
|
buf,
|
|
format="JPEG",
|
|
quality=85,
|
|
optimize=False,
|
|
progressive=False,
|
|
subsampling=2,
|
|
)
|
|
return buf.getvalue()
|
|
|
|
|
|
def build(plan: OutlierInjectionPlan, out_root: Path) -> OutlierInjectionReport:
|
|
"""Generate the outlier-injection-derkachi fixture under ``out_root``.
|
|
|
|
Returns an ``OutlierInjectionReport`` summarising the run. Writes:
|
|
|
|
<out_root>/
|
|
frames/AD000001.jpg # passthrough or replaced
|
|
frames/AD000002.jpg # …
|
|
manifest.csv # per-replaced-frame metadata
|
|
summary.json # report fields, machine-readable
|
|
"""
|
|
if out_root.exists():
|
|
shutil.rmtree(out_root)
|
|
(out_root / "frames").mkdir(parents=True)
|
|
|
|
src_dir = plan.source_frames_dir
|
|
if not src_dir.is_dir():
|
|
raise FileNotFoundError(f"source frames directory not found: {src_dir}")
|
|
frames = sorted(src_dir.glob("AD*.jpg"))
|
|
if not frames:
|
|
raise FileNotFoundError(f"no AD*.jpg frames under {src_dir}")
|
|
|
|
tiles = read_tile_manifest(plan.tile_cache_dir / "manifest.csv")
|
|
|
|
ratio = _DENSITY_RATIO[plan.density]
|
|
replace_indices = set(iter_video_frame_indices(len(frames), ratio))
|
|
rng = derive_rng("outlier", plan.seed, plan.density)
|
|
|
|
manifest_rows: list[dict] = []
|
|
geodesic_offsets: list[float] = []
|
|
|
|
for frame_idx, frame_path in enumerate(frames):
|
|
out_path = out_root / "frames" / frame_path.name
|
|
if frame_idx not in replace_indices:
|
|
shutil.copy2(frame_path, out_path)
|
|
continue
|
|
|
|
src_lat, src_lon, src_tile_idx = _gt_centre_for_frame(frame_idx, tiles)
|
|
candidates = far_away_indices(tiles, src_tile_idx, plan.min_offset_m)
|
|
if not candidates:
|
|
raise RuntimeError(
|
|
f"no tile in {plan.tile_cache_dir} is ≥{plan.min_offset_m} m "
|
|
f"from frame {frame_path.name} — tile cache too small for "
|
|
"outlier injection"
|
|
)
|
|
pick_idx = int(rng.integers(0, len(candidates)))
|
|
chosen = tiles[candidates[pick_idx]]
|
|
offset_m = haversine_m(
|
|
src_lat, src_lon, chosen.centre_lat_deg, chosen.centre_lon_deg
|
|
)
|
|
geodesic_offsets.append(offset_m)
|
|
|
|
jpeg = _read_replacement_jpeg(plan.tile_cache_dir, chosen.jpeg_path)
|
|
out_path.write_bytes(jpeg)
|
|
|
|
manifest_rows.append(
|
|
{
|
|
"frame_idx": frame_idx,
|
|
"src_jpeg_path": str(frame_path.name),
|
|
"replacement_tile_x": chosen.tile_x,
|
|
"replacement_tile_y": chosen.tile_y,
|
|
"replacement_zoom": chosen.zoom_level,
|
|
"geodesic_offset_m": f"{offset_m:.3f}",
|
|
"density": plan.density,
|
|
"seed": plan.seed,
|
|
}
|
|
)
|
|
|
|
_write_manifest(out_root, manifest_rows)
|
|
report = OutlierInjectionReport(
|
|
out_root=out_root,
|
|
total_source_frames=len(frames),
|
|
replaced_frame_count=len(manifest_rows),
|
|
density=plan.density,
|
|
min_geodesic_offset_m=min(geodesic_offsets) if geodesic_offsets else 0.0,
|
|
max_geodesic_offset_m=max(geodesic_offsets) if geodesic_offsets else 0.0,
|
|
)
|
|
_write_summary(out_root, report)
|
|
return report
|
|
|
|
|
|
def _write_manifest(out_root: Path, rows: list[dict]) -> None:
|
|
manifest = out_root / "manifest.csv"
|
|
with manifest.open("w", newline="") as fp:
|
|
writer = csv.DictWriter(
|
|
fp,
|
|
fieldnames=[
|
|
"frame_idx",
|
|
"src_jpeg_path",
|
|
"replacement_tile_x",
|
|
"replacement_tile_y",
|
|
"replacement_zoom",
|
|
"geodesic_offset_m",
|
|
"density",
|
|
"seed",
|
|
],
|
|
lineterminator="\n",
|
|
)
|
|
writer.writeheader()
|
|
for row in sorted(rows, key=lambda r: r["frame_idx"]):
|
|
writer.writerow(row)
|
|
|
|
|
|
def _write_summary(out_root: Path, report: OutlierInjectionReport) -> None:
|
|
payload = {
|
|
"scenario": "outlier-injection-derkachi",
|
|
"total_source_frames": report.total_source_frames,
|
|
"replaced_frame_count": report.replaced_frame_count,
|
|
"density": report.density,
|
|
"min_geodesic_offset_m": round(report.min_geodesic_offset_m, 3),
|
|
"max_geodesic_offset_m": round(report.max_geodesic_offset_m, 3),
|
|
}
|
|
(out_root / "summary.json").write_text(
|
|
json.dumps(payload, sort_keys=True, indent=2) + "\n"
|
|
)
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = argparse.ArgumentParser(description="Outlier injection (FT-N-01)")
|
|
parser.add_argument("--source-frames", type=Path, required=True)
|
|
parser.add_argument("--tile-cache", type=Path, required=True)
|
|
parser.add_argument("--density", choices=("light", "medium", "heavy"), required=True)
|
|
parser.add_argument("--seed", type=int, default=0)
|
|
parser.add_argument("--min-offset-m", type=float, default=350.0)
|
|
parser.add_argument(
|
|
"--out-root",
|
|
type=Path,
|
|
default=None,
|
|
help="Output dir. If omitted, /tmp/<run_id>/outlier-<density>/.",
|
|
)
|
|
parser.add_argument("--run-id", default="local")
|
|
parser.add_argument("--quiet", action="store_true")
|
|
args = parser.parse_args(argv)
|
|
|
|
logging.basicConfig(
|
|
level=logging.WARNING if args.quiet else logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
|
)
|
|
|
|
out_root = args.out_root or tmpfs_root(args.run_id, f"outlier-{args.density}")
|
|
plan = OutlierInjectionPlan(
|
|
source_frames_dir=args.source_frames,
|
|
tile_cache_dir=args.tile_cache,
|
|
density=args.density,
|
|
seed=args.seed,
|
|
min_offset_m=args.min_offset_m,
|
|
)
|
|
report = build(plan, out_root)
|
|
summary = {
|
|
"scenario": "outlier-injection-derkachi",
|
|
"out_root": str(report.out_root),
|
|
"total_source_frames": report.total_source_frames,
|
|
"replaced_frame_count": report.replaced_frame_count,
|
|
"density": report.density,
|
|
"min_geodesic_offset_m": round(report.min_geodesic_offset_m, 3),
|
|
"max_geodesic_offset_m": round(report.max_geodesic_offset_m, 3),
|
|
}
|
|
json.dump(summary, sys.stdout, sort_keys=True, indent=2)
|
|
sys.stdout.write("\n")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|