"""multi-segment-derkachi — ≥3 disjoint blackout windows, NO spoof (FT-P-08). Generates a blackout-only fixture: ``n_segments`` disjoint all-black windows distributed across the Derkachi flight, with no paired GPS spoof. Drives the satellite-reference re-localization positive path; explicitly NOT the security failsafe path (that's FT-N-04 / NFT-RES-04, owned by the blackout_spoof injector). Constraints (AC-5): * ≥3 disjoint blackout windows. * Consecutive windows separated by ≥30 s of normal frames. * Total blackout coverage ≤25 % of the source duration. Window placement is deterministic-positional (anchored at fixed fractions of the source duration) rather than random — that keeps the test's "window N starts at second X" assertion stable. The seed is still accepted for API symmetry with the other injectors but currently does not affect the output (documented in the dataclass docstring); future NFT-RES-04 variants may use it to perturb segment lengths. Public-boundary discipline: this module does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations import argparse import csv import io import json import logging import shutil import sys from dataclasses import dataclass from pathlib import Path from ._common import tmpfs_root logger = logging.getLogger(__name__) # Constraint constants (AC-5 of AZ-408). _MIN_INTER_SEGMENT_GAP_SECONDS = 30.0 _MAX_TOTAL_BLACKOUT_FRACTION = 0.25 _DEFAULT_SRC_FPS = 30.0 _TILE_W = 256 _TILE_H = 256 @dataclass(frozen=True) class MultiSegmentPlan: """Configuration for the multi-segment-derkachi fixture. AZ-408 replaces the AZ-406 scaffold dataclass; the previous shape (just ``n_segments`` + ``gap_seconds``) is extended to include the inputs the build path needs. ``seed`` is accepted for symmetry but is not currently consumed — segment placement is deterministic-positional. """ source_frames_dir: Path n_segments: int = 3 segment_seconds: float = 12.0 source_fps: float = _DEFAULT_SRC_FPS seed: int = 0 @dataclass(frozen=True) class SegmentWindow: start_ms: int end_ms: int first_frame_idx: int last_frame_idx: int @dataclass(frozen=True) class MultiSegmentReport: out_root: Path segments: list[SegmentWindow] source_duration_ms: int total_blackout_frames: int total_blackout_fraction: float def _plan_segments(plan: MultiSegmentPlan, total_frames: int) -> list[SegmentWindow]: """Compute the segment windows that satisfy AC-5. Strategy: place ``n_segments`` windows uniformly across the source duration, each window starts at ``(i+1) / (n+1)`` of the duration (so first window is not at t=0 and last window is not at t=END). Then validate the gap constraint + the total-coverage constraint and raise if the plan is infeasible (rather than silently truncating). """ if plan.n_segments < 3: raise ValueError(f"n_segments must be ≥3 (AC-5); got {plan.n_segments}") if plan.segment_seconds <= 0: raise ValueError(f"segment_seconds must be > 0; got {plan.segment_seconds}") src_duration_s = total_frames / plan.source_fps src_duration_ms = int(round(src_duration_s * 1000.0)) seg_ms = int(round(plan.segment_seconds * 1000.0)) segments: list[SegmentWindow] = [] for i in range(plan.n_segments): anchor_s = src_duration_s * (i + 1) / (plan.n_segments + 1) start_ms = int(round(anchor_s * 1000.0)) end_ms = min(start_ms + seg_ms, src_duration_ms) first_frame = int(round(start_ms / 1000.0 * plan.source_fps)) last_frame = int(round(end_ms / 1000.0 * plan.source_fps)) segments.append( SegmentWindow( start_ms=start_ms, end_ms=end_ms, first_frame_idx=first_frame, last_frame_idx=min(last_frame, total_frames), ) ) # AC-5 gap check. for prev, nxt in zip(segments, segments[1:]): gap_ms = nxt.start_ms - prev.end_ms if gap_ms < _MIN_INTER_SEGMENT_GAP_SECONDS * 1000: raise ValueError( f"infeasible plan: gap between segment ending at {prev.end_ms} ms " f"and segment starting at {nxt.start_ms} ms is {gap_ms} ms < " f"{int(_MIN_INTER_SEGMENT_GAP_SECONDS * 1000)} ms (AC-5). Reduce " "segment_seconds or n_segments, or use a longer source." ) # AC-5 coverage check. total_blackout_ms = sum(s.end_ms - s.start_ms for s in segments) fraction = total_blackout_ms / max(1, src_duration_ms) if fraction > _MAX_TOTAL_BLACKOUT_FRACTION: raise ValueError( f"infeasible plan: total blackout fraction is {fraction:.3f} " f"> {_MAX_TOTAL_BLACKOUT_FRACTION:.2f} (AC-5). Reduce " "segment_seconds or n_segments." ) return segments def _black_jpeg_bytes() -> bytes: from PIL import Image # noqa: PLC0415 — heavy import, deferred img = Image.new("RGB", (_TILE_W, _TILE_H), color=(0, 0, 0)) buf = io.BytesIO() img.save( buf, format="JPEG", quality=85, optimize=False, progressive=False, subsampling=2, ) return buf.getvalue() def build(plan: MultiSegmentPlan, out_root: Path) -> MultiSegmentReport: """Generate the multi-segment-derkachi fixture under ``out_root``.""" if out_root.exists(): shutil.rmtree(out_root) (out_root / "frames").mkdir(parents=True) src_dir = plan.source_frames_dir if not src_dir.is_dir(): raise FileNotFoundError(f"source frames directory not found: {src_dir}") frames = sorted(src_dir.glob("AD*.jpg")) if not frames: raise FileNotFoundError(f"no AD*.jpg frames under {src_dir}") total_frames = len(frames) src_duration_ms = int(round(total_frames / plan.source_fps * 1000.0)) segments = _plan_segments(plan, total_frames) black_jpeg = _black_jpeg_bytes() manifest_rows: list[dict] = [] blackout_set: set[int] = set() for seg_idx, seg in enumerate(segments): for f in range(seg.first_frame_idx, min(seg.last_frame_idx, total_frames)): blackout_set.add(f) manifest_rows.append( { "frame_idx": f, "src_jpeg_path": frames[f].name, "segment_idx": seg_idx, "segment_start_ms": seg.start_ms, "segment_end_ms": seg.end_ms, } ) for frame_idx, frame_path in enumerate(frames): out_path = out_root / "frames" / frame_path.name if frame_idx in blackout_set: out_path.write_bytes(black_jpeg) else: shutil.copy2(frame_path, out_path) _write_schedule(out_root, segments) _write_manifest(out_root, manifest_rows) total_blackout = sum(s.last_frame_idx - s.first_frame_idx for s in segments) fraction = (sum(s.end_ms - s.start_ms for s in segments)) / max(1, src_duration_ms) report = MultiSegmentReport( out_root=out_root, segments=segments, source_duration_ms=src_duration_ms, total_blackout_frames=total_blackout, total_blackout_fraction=fraction, ) _write_summary(out_root, report) return report def _write_schedule(out_root: Path, segments: list[SegmentWindow]) -> None: payload = { "segments": [ { "start_ms": s.start_ms, "end_ms": s.end_ms, "first_frame_idx": s.first_frame_idx, "last_frame_idx": s.last_frame_idx, } for s in segments ] } (out_root / "schedule.json").write_text( json.dumps(payload, sort_keys=True, indent=2) + "\n" ) def _write_manifest(out_root: Path, rows: list[dict]) -> None: manifest = out_root / "manifest.csv" with manifest.open("w", newline="") as fp: writer = csv.DictWriter( fp, fieldnames=["frame_idx", "src_jpeg_path", "segment_idx", "segment_start_ms", "segment_end_ms"], lineterminator="\n", ) writer.writeheader() for row in sorted(rows, key=lambda r: (r["segment_idx"], r["frame_idx"])): writer.writerow(row) def _write_summary(out_root: Path, report: MultiSegmentReport) -> None: payload = { "scenario": "multi-segment-derkachi", "n_segments": len(report.segments), "source_duration_ms": report.source_duration_ms, "total_blackout_frames": report.total_blackout_frames, "total_blackout_fraction": round(report.total_blackout_fraction, 6), "segments": [ {"start_ms": s.start_ms, "end_ms": s.end_ms} for s in report.segments ], } (out_root / "summary.json").write_text( json.dumps(payload, sort_keys=True, indent=2) + "\n" ) def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description="Multi-segment blackout (FT-P-08)") parser.add_argument("--source-frames", type=Path, required=True) parser.add_argument("--n-segments", type=int, default=3) parser.add_argument("--segment-seconds", type=float, default=12.0) parser.add_argument("--source-fps", type=float, default=_DEFAULT_SRC_FPS) parser.add_argument("--seed", type=int, default=0) parser.add_argument( "--out-root", type=Path, default=None, help="Output dir. If omitted, /tmp//multi-segment/.", ) parser.add_argument("--run-id", default="local") parser.add_argument("--quiet", action="store_true") args = parser.parse_args(argv) logging.basicConfig( level=logging.WARNING if args.quiet else logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) out_root = args.out_root or tmpfs_root(args.run_id, "multi-segment") plan = MultiSegmentPlan( source_frames_dir=args.source_frames, n_segments=args.n_segments, segment_seconds=args.segment_seconds, source_fps=args.source_fps, seed=args.seed, ) report = build(plan, out_root) summary = { "scenario": "multi-segment-derkachi", "out_root": str(report.out_root), "n_segments": len(report.segments), "source_duration_ms": report.source_duration_ms, "total_blackout_frames": report.total_blackout_frames, "total_blackout_fraction": round(report.total_blackout_fraction, 6), } json.dump(summary, sys.stdout, sort_keys=True, indent=2) sys.stdout.write("\n") return 0 if __name__ == "__main__": raise SystemExit(main())