Files
Oleksandr Bezdieniezhnykh 702a0c0ff3 [AZ-408] [AZ-410] [AZ-411] Batch 69: synth injectors + FT-P-02/03/14
AZ-408 (3pt) — Replace AZ-406 injector scaffolds with concrete generators:
- outlier.py: deterministic stride + far-away tile replacement; AC-2 ≥350m offset
- blackout_spoof.py: paired video blackout + FC GPS spoof with ≤40ms alignment;
  AC-4 realistic fix_type/hdop; AC-NEW-8 200-500m inter-spoof deltas
- multi_segment.py: ≥3 disjoint windows, ≥30s gaps, ≤25% coverage
- fc_proxy.py: timed-splice runtime proxy with pre-activate RuntimeError guard
- _common.py: derive_rng + tile-manifest reader + tmpfs helpers
- injector_fixtures.py: pytest fixtures wired via runner conftest

AZ-410 (3pt) — FT-P-02 cumulative drift between satellite anchors:
- anchor_pair_detector.py: AC-1 detection, AC-2/3 pass-fraction,
  AC-4 monotonicity check, CSV evidence
- test_ft_p_02_derkachi_drift.py: scenario gated on upstream helper
  NotImplementedError (frame_source_replay / fdr_reader / imu_replay)

AZ-411 (2pt) — FT-P-03 + FT-P-14 schema + WGS84:
- estimate_schema.py: AC-1 schema completeness, AC-2 source-label set
  containment, AC-3 WGS84 range + int32 1e-7 decode
- test_ft_p_03_14_schema_wgs84.py: shared single-image-push scenario

Tests: 248 unit tests pass (+91 vs batch 68).
Reports: batch_69_report.md, batch_69_review.md (PASS),
cumulative_review_batches_67-69_cycle1_report.md (PASS).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 17:54:00 +03:00

419 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""blackout-spoof-derkachi — synchronized visual blackout + GPS spoof (FT-N-04, NFT-RES-04).
Produces a **schedule** + paired runtime artefacts for a coordinated
visual-blackout / FC-GPS-spoof scenario. The schedule itself is the
single source of truth — the video-overlay portion AND the FC-inbound
proxy patch both read from it so the two streams stay synchronized
within AC-3 (≤40 ms wall-clock alignment).
What ``build()`` writes:
<out_root>/
schedule.json # window_start_ms / window_end_ms,
# spoofed-GPS frame timeline
frames/AD000001.jpg # source frame, OR a black frame inside windows
manifest.csv # per-replaced-frame metadata for tests
summary.json # aggregate (window count, max alignment err, …)
The schedule's ``spoof_gps`` list is consumed by ``fc_proxy.py`` at run
time: the proxy walks its monotonic clock and, when ``now_ms`` falls
inside ``[window_start_ms, window_end_ms]``, replaces inbound GPS frames
with the next pre-computed spoofed record.
Determinism (AC-1 of AZ-408): identical ``(window_seconds, spoof_offset_m,
spoof_bearing_deg, seed)`` reproduce the same schedule and frame outputs.
Spoof-GPS values come from a ``derive_rng("blackout_spoof", …)`` stream;
window timing is deterministic-positional (anchored at 30 % of the source
duration so each window family ends inside the flight). The 200500 m
inter-spoof delta requirement (AC-4 / AC-NEW-8) is enforced by the
delta-bound parameter — no random rejection sampling.
Public-boundary discipline: this module does NOT import any
``src/gps_denied_onboard`` symbol.
"""
from __future__ import annotations
import argparse
import csv
import io
import json
import logging
import math
import shutil
import sys
from dataclasses import dataclass, field
from pathlib import Path
import numpy as np
from ._common import derive_rng, tmpfs_root
logger = logging.getLogger(__name__)
# AC-NEW-8: spoofed GPS jumps 200-500 m between consecutive spoof frames.
_MIN_INTER_SPOOF_DELTA_M = 200.0
_MAX_INTER_SPOOF_DELTA_M = 500.0
# Spoofed-frame cadence — typical FC GPS update rate (10 Hz).
_SPOOF_HZ = 10.0
# AC-4: spoofed fields stay inside typical-flight ranges.
_SPOOF_FIX_TYPES = (3, 4) # GPS_FIX_TYPE_3D / GPS_FIX_TYPE_DGPS
_SPOOF_HDOP_RANGE = (0.5, 2.5)
# Source-frame defaults — overrideable via CLI.
_DEFAULT_SRC_FPS = 30.0
_TILE_W = 256
_TILE_H = 256
@dataclass(frozen=True)
class BlackoutSpoofPlan:
"""Configuration for the blackout-spoof-derkachi fixture.
AZ-408 replaces the AZ-406 scaffold dataclass; the previous shape
(``blackout_seconds`` / ``spoof_offset_m`` / ``spoof_bearing_deg``)
is preserved and extended with the inputs the runtime build path
needs.
"""
source_frames_dir: Path
blackout_seconds: float
seed: int = 0
spoof_offset_m: float = 350.0
spoof_bearing_deg: float = 45.0
source_fps: float = _DEFAULT_SRC_FPS
# AC-NEW-3: the proxy must START emitting spoofed GPS within ≤40 ms
# of the first all-black video frame. This is a documented invariant
# the runtime proxy enforces; we keep it in the plan as the
# "promised" alignment so tests can assert against it.
max_alignment_err_ms: float = 40.0
initial_lat_deg: float = 50.075
initial_lon_deg: float = 36.15
@dataclass(frozen=True)
class SpoofGpsFrame:
"""One spoofed GPS record — what fc_proxy will inject in place of real GPS."""
monotonic_ms: int
lat_deg: float
lon_deg: float
alt_m: float
fix_type: int
hdop: float
@dataclass(frozen=True)
class BlackoutSpoofSchedule:
"""The full coordinated timeline written to ``schedule.json``."""
window_start_ms: int
window_end_ms: int
spoof_gps: list[SpoofGpsFrame] = field(default_factory=list)
blackout_frame_indices: list[int] = field(default_factory=list)
max_alignment_err_ms: float = 40.0
@dataclass(frozen=True)
class BlackoutSpoofReport:
"""Summary of a single ``build()`` run — written to ``summary.json``."""
out_root: Path
schedule: BlackoutSpoofSchedule
blackout_frame_count: int
spoof_frame_count: int
inter_spoof_delta_m_min: float
inter_spoof_delta_m_max: float
def _bearing_offset(lat: float, lon: float, bearing_deg: float, dist_m: float) -> tuple[float, float]:
"""Project ``(lat, lon)`` along ``bearing_deg`` by ``dist_m`` (great-circle)."""
R = 6_371_000.0
br = math.radians(bearing_deg)
lat1 = math.radians(lat)
lon1 = math.radians(lon)
ang = dist_m / R
lat2 = math.asin(math.sin(lat1) * math.cos(ang) + math.cos(lat1) * math.sin(ang) * math.cos(br))
lon2 = lon1 + math.atan2(
math.sin(br) * math.sin(ang) * math.cos(lat1),
math.cos(ang) - math.sin(lat1) * math.sin(lat2),
)
return math.degrees(lat2), math.degrees(lon2)
def _build_spoof_gps_track(
plan: BlackoutSpoofPlan,
window_start_ms: int,
window_end_ms: int,
rng: np.random.Generator,
) -> list[SpoofGpsFrame]:
"""Generate a spoofed-GPS track that satisfies AC-4 + AC-NEW-8.
The track starts at the plan's initial point + spoof_offset_m along
spoof_bearing_deg (the initial "jump" that defines the spoofed
position). Subsequent frames jump 200-500 m in a randomly-perturbed
bearing each step — enforced deterministically by the seeded RNG.
"""
cadence_ms = int(round(1000.0 / _SPOOF_HZ))
frames: list[SpoofGpsFrame] = []
cur_lat, cur_lon = _bearing_offset(
plan.initial_lat_deg, plan.initial_lon_deg, plan.spoof_bearing_deg, plan.spoof_offset_m
)
cur_alt = 300.0 # plausible-cruise altitude (matches `flight_derkachi/camera_info.md`)
cur_bearing = plan.spoof_bearing_deg
t = window_start_ms
while t <= window_end_ms:
delta_m = float(
rng.uniform(_MIN_INTER_SPOOF_DELTA_M, _MAX_INTER_SPOOF_DELTA_M)
)
# Perturb bearing ±60° per step so the spoofed track looks like
# a realistic-but-bad GPS noise pattern (not a straight line).
cur_bearing = (cur_bearing + float(rng.uniform(-60.0, 60.0))) % 360.0
cur_lat, cur_lon = _bearing_offset(cur_lat, cur_lon, cur_bearing, delta_m)
# Stay inside realistic flight altitude range; small noise only.
cur_alt += float(rng.uniform(-2.0, 2.0))
fix_type = int(rng.choice(_SPOOF_FIX_TYPES))
hdop = float(rng.uniform(*_SPOOF_HDOP_RANGE))
frames.append(
SpoofGpsFrame(
monotonic_ms=t,
lat_deg=round(cur_lat, 7),
lon_deg=round(cur_lon, 7),
alt_m=round(cur_alt, 3),
fix_type=fix_type,
hdop=round(hdop, 3),
)
)
t += cadence_ms
return frames
def _black_jpeg_bytes() -> bytes:
"""All-black 256×256 JPEG using the project's pinned PIL settings."""
from PIL import Image # noqa: PLC0415 — heavy import, deferred
img = Image.new("RGB", (_TILE_W, _TILE_H), color=(0, 0, 0))
buf = io.BytesIO()
img.save(
buf,
format="JPEG",
quality=85,
optimize=False,
progressive=False,
subsampling=2,
)
return buf.getvalue()
def build(plan: BlackoutSpoofPlan, out_root: Path) -> BlackoutSpoofReport:
"""Generate the blackout-spoof-derkachi fixture under ``out_root``."""
if plan.blackout_seconds <= 0:
raise ValueError(f"blackout_seconds must be > 0; got {plan.blackout_seconds}")
if out_root.exists():
shutil.rmtree(out_root)
(out_root / "frames").mkdir(parents=True)
src_dir = plan.source_frames_dir
if not src_dir.is_dir():
raise FileNotFoundError(f"source frames directory not found: {src_dir}")
frames = sorted(src_dir.glob("AD*.jpg"))
if not frames:
raise FileNotFoundError(f"no AD*.jpg frames under {src_dir}")
total_frames = len(frames)
src_duration_ms = int(round((total_frames / plan.source_fps) * 1000.0))
# Anchor the window at 30 % of the source duration. The window must
# fit inside the source — if the requested blackout is longer than
# the remaining flight, fall back to "blackout from 30 % to end".
window_start_ms = int(0.3 * src_duration_ms)
window_end_ms = min(
window_start_ms + int(plan.blackout_seconds * 1000), src_duration_ms
)
# Frame-index window in the source frame-stream (frames are at
# ``source_fps`` Hz so a window of ``W`` ms maps to ``W/1000 * fps``
# frames).
first_blackout_frame = int(round(window_start_ms / 1000.0 * plan.source_fps))
last_blackout_frame = int(round(window_end_ms / 1000.0 * plan.source_fps))
blackout_indices = list(range(first_blackout_frame, min(last_blackout_frame, total_frames)))
rng = derive_rng(
"blackout_spoof",
plan.seed,
plan.blackout_seconds,
plan.spoof_offset_m,
plan.spoof_bearing_deg,
)
spoof_frames = _build_spoof_gps_track(plan, window_start_ms, window_end_ms, rng)
schedule = BlackoutSpoofSchedule(
window_start_ms=window_start_ms,
window_end_ms=window_end_ms,
spoof_gps=spoof_frames,
blackout_frame_indices=blackout_indices,
max_alignment_err_ms=plan.max_alignment_err_ms,
)
black_jpeg = _black_jpeg_bytes()
manifest_rows: list[dict] = []
blackout_set = set(blackout_indices)
for frame_idx, frame_path in enumerate(frames):
out_path = out_root / "frames" / frame_path.name
if frame_idx in blackout_set:
out_path.write_bytes(black_jpeg)
manifest_rows.append(
{
"frame_idx": frame_idx,
"src_jpeg_path": frame_path.name,
"kind": "blackout",
"window_start_ms": window_start_ms,
"window_end_ms": window_end_ms,
"seed": plan.seed,
}
)
else:
shutil.copy2(frame_path, out_path)
_write_schedule(out_root, schedule)
_write_manifest(out_root, manifest_rows)
deltas_m: list[float] = []
for prev, nxt in zip(spoof_frames, spoof_frames[1:]):
from ._common import haversine_m as _hav
deltas_m.append(_hav(prev.lat_deg, prev.lon_deg, nxt.lat_deg, nxt.lon_deg))
report = BlackoutSpoofReport(
out_root=out_root,
schedule=schedule,
blackout_frame_count=len(blackout_indices),
spoof_frame_count=len(spoof_frames),
inter_spoof_delta_m_min=min(deltas_m) if deltas_m else 0.0,
inter_spoof_delta_m_max=max(deltas_m) if deltas_m else 0.0,
)
_write_summary(out_root, report)
return report
def _write_schedule(out_root: Path, schedule: BlackoutSpoofSchedule) -> None:
payload = {
"window_start_ms": schedule.window_start_ms,
"window_end_ms": schedule.window_end_ms,
"max_alignment_err_ms": schedule.max_alignment_err_ms,
"blackout_frame_indices": schedule.blackout_frame_indices,
"spoof_gps": [
{
"monotonic_ms": f.monotonic_ms,
"lat_deg": f.lat_deg,
"lon_deg": f.lon_deg,
"alt_m": f.alt_m,
"fix_type": f.fix_type,
"hdop": f.hdop,
}
for f in schedule.spoof_gps
],
}
(out_root / "schedule.json").write_text(
json.dumps(payload, sort_keys=True, indent=2) + "\n"
)
def _write_manifest(out_root: Path, rows: list[dict]) -> None:
manifest = out_root / "manifest.csv"
with manifest.open("w", newline="") as fp:
writer = csv.DictWriter(
fp,
fieldnames=["frame_idx", "src_jpeg_path", "kind", "window_start_ms", "window_end_ms", "seed"],
lineterminator="\n",
)
writer.writeheader()
for row in sorted(rows, key=lambda r: r["frame_idx"]):
writer.writerow(row)
def _write_summary(out_root: Path, report: BlackoutSpoofReport) -> None:
payload = {
"scenario": "blackout-spoof-derkachi",
"window_start_ms": report.schedule.window_start_ms,
"window_end_ms": report.schedule.window_end_ms,
"blackout_frame_count": report.blackout_frame_count,
"spoof_frame_count": report.spoof_frame_count,
"inter_spoof_delta_m_min": round(report.inter_spoof_delta_m_min, 3),
"inter_spoof_delta_m_max": round(report.inter_spoof_delta_m_max, 3),
"max_alignment_err_ms": report.schedule.max_alignment_err_ms,
}
(out_root / "summary.json").write_text(
json.dumps(payload, sort_keys=True, indent=2) + "\n"
)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Blackout + spoofed-GPS injection (FT-N-04)")
parser.add_argument("--source-frames", type=Path, required=True)
parser.add_argument(
"--window-seconds",
type=float,
required=True,
help="Blackout window length in seconds (5/15/35 for FT-N-04 / NFT-RES-04 family)",
)
parser.add_argument("--seed", type=int, default=0)
parser.add_argument("--spoof-offset-m", type=float, default=350.0)
parser.add_argument("--spoof-bearing-deg", type=float, default=45.0)
parser.add_argument("--source-fps", type=float, default=_DEFAULT_SRC_FPS)
parser.add_argument(
"--out-root",
type=Path,
default=None,
help="Output dir. If omitted, /tmp/<run_id>/blackout-spoof-<window_seconds>s/.",
)
parser.add_argument("--run-id", default="local")
parser.add_argument("--quiet", action="store_true")
args = parser.parse_args(argv)
logging.basicConfig(
level=logging.WARNING if args.quiet else logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
out_root = args.out_root or tmpfs_root(
args.run_id, f"blackout-spoof-{int(args.window_seconds)}s"
)
plan = BlackoutSpoofPlan(
source_frames_dir=args.source_frames,
blackout_seconds=args.window_seconds,
seed=args.seed,
spoof_offset_m=args.spoof_offset_m,
spoof_bearing_deg=args.spoof_bearing_deg,
source_fps=args.source_fps,
)
report = build(plan, out_root)
summary = {
"scenario": "blackout-spoof-derkachi",
"out_root": str(report.out_root),
"window_start_ms": report.schedule.window_start_ms,
"window_end_ms": report.schedule.window_end_ms,
"blackout_frame_count": report.blackout_frame_count,
"spoof_frame_count": report.spoof_frame_count,
"inter_spoof_delta_m_min": round(report.inter_spoof_delta_m_min, 3),
"inter_spoof_delta_m_max": round(report.inter_spoof_delta_m_max, 3),
"max_alignment_err_ms": report.schedule.max_alignment_err_ms,
}
json.dump(summary, sys.stdout, sort_keys=True, indent=2)
sys.stdout.write("\n")
return 0
if __name__ == "__main__":
raise SystemExit(main())