mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 10:11:12 +00:00
147 lines
4.5 KiB
Python
147 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Remove neon OSD strokes from a video using color masking and inpainting."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
|
|
def build_mask(frame: np.ndarray, dilate: int) -> np.ndarray:
|
|
"""Return a mask for bright green/cyan OSD strokes."""
|
|
b, g, r = cv2.split(frame)
|
|
|
|
# The OSD strokes are neon-like: green/cyan channels are high and dominate red.
|
|
green = (g > 95) & ((g.astype(np.int16) - r.astype(np.int16)) > 22) & (
|
|
(g.astype(np.int16) - b.astype(np.int16)) > 0
|
|
)
|
|
cyan = (g > 105) & (b > 95) & ((g.astype(np.int16) - r.astype(np.int16)) > 28)
|
|
|
|
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
|
|
hue = hsv[:, :, 0]
|
|
sat = hsv[:, :, 1]
|
|
val = hsv[:, :, 2]
|
|
neon_hsv = (sat > 55) & (val > 85) & (hue >= 35) & (hue <= 105)
|
|
|
|
mask = ((green | cyan) & neon_hsv).astype(np.uint8) * 255
|
|
|
|
# Remove isolated natural-color speckles, then expand just enough to cover antialiasing.
|
|
open_kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))
|
|
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, open_kernel)
|
|
if dilate > 0:
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2 * dilate + 1, 2 * dilate + 1))
|
|
mask = cv2.dilate(mask, kernel, iterations=1)
|
|
return mask
|
|
|
|
|
|
def process_video(
|
|
input_path: Path,
|
|
output_path: Path,
|
|
temp_video_path: Path,
|
|
*,
|
|
seconds: float | None,
|
|
mask_preview: bool,
|
|
method: str,
|
|
radius: float,
|
|
dilate: int,
|
|
) -> None:
|
|
cap = cv2.VideoCapture(str(input_path))
|
|
if not cap.isOpened():
|
|
raise RuntimeError(f"Could not open input video: {input_path}")
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
frame_limit = total_frames if seconds is None else min(total_frames, int(round(seconds * fps)))
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
|
writer = cv2.VideoWriter(str(temp_video_path), fourcc, fps, (width, height))
|
|
if not writer.isOpened():
|
|
raise RuntimeError(f"Could not open temporary output video: {temp_video_path}")
|
|
|
|
frame_index = 0
|
|
while frame_index < frame_limit:
|
|
ok, frame = cap.read()
|
|
if not ok:
|
|
break
|
|
|
|
mask = build_mask(frame, dilate=dilate)
|
|
if mask_preview:
|
|
rendered = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
|
|
else:
|
|
inpaint_method = cv2.INPAINT_NS if method == "ns" else cv2.INPAINT_TELEA
|
|
rendered = cv2.inpaint(frame, mask, radius, inpaint_method)
|
|
|
|
writer.write(rendered)
|
|
frame_index += 1
|
|
if frame_index % int(max(fps * 10, 1)) == 0:
|
|
print(f"processed {frame_index}/{frame_limit} frames", flush=True)
|
|
|
|
writer.release()
|
|
cap.release()
|
|
|
|
|
|
def mux_audio(input_path: Path, temp_video_path: Path, output_path: Path, seconds: float | None) -> None:
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-hide_banner",
|
|
"-y",
|
|
"-i",
|
|
str(temp_video_path),
|
|
"-i",
|
|
str(input_path),
|
|
"-map",
|
|
"0:v:0",
|
|
"-map",
|
|
"1:a?",
|
|
"-c:v",
|
|
"libx264",
|
|
"-crf",
|
|
"18",
|
|
"-preset",
|
|
"medium",
|
|
"-c:a",
|
|
"copy",
|
|
]
|
|
if seconds is not None:
|
|
cmd.extend(["-t", str(seconds)])
|
|
cmd.append(str(output_path))
|
|
subprocess.run(cmd, check=True)
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("input", type=Path)
|
|
parser.add_argument("output", type=Path)
|
|
parser.add_argument("--seconds", type=float, default=None)
|
|
parser.add_argument("--mask-preview", action="store_true")
|
|
parser.add_argument("--method", choices=["telea", "ns"], default="telea")
|
|
parser.add_argument("--radius", type=float, default=3.0)
|
|
parser.add_argument("--dilate", type=int, default=1)
|
|
args = parser.parse_args()
|
|
|
|
args.output.parent.mkdir(parents=True, exist_ok=True)
|
|
with tempfile.TemporaryDirectory(prefix="osd_inpaint_") as temp_dir:
|
|
temp_video_path = Path(temp_dir) / "video_no_audio.mp4"
|
|
process_video(
|
|
args.input,
|
|
args.output,
|
|
temp_video_path,
|
|
seconds=args.seconds,
|
|
mask_preview=args.mask_preview,
|
|
method=args.method,
|
|
radius=args.radius,
|
|
dilate=args.dilate,
|
|
)
|
|
mux_audio(args.input, temp_video_path, args.output, args.seconds)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|