Files

147 lines
4.5 KiB
Python

#!/usr/bin/env python3
"""Remove neon OSD strokes from a video using color masking and inpainting."""
from __future__ import annotations
import argparse
import subprocess
import tempfile
from pathlib import Path
import cv2
import numpy as np
def build_mask(frame: np.ndarray, dilate: int) -> np.ndarray:
"""Return a mask for bright green/cyan OSD strokes."""
b, g, r = cv2.split(frame)
# The OSD strokes are neon-like: green/cyan channels are high and dominate red.
green = (g > 95) & ((g.astype(np.int16) - r.astype(np.int16)) > 22) & (
(g.astype(np.int16) - b.astype(np.int16)) > 0
)
cyan = (g > 105) & (b > 95) & ((g.astype(np.int16) - r.astype(np.int16)) > 28)
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
hue = hsv[:, :, 0]
sat = hsv[:, :, 1]
val = hsv[:, :, 2]
neon_hsv = (sat > 55) & (val > 85) & (hue >= 35) & (hue <= 105)
mask = ((green | cyan) & neon_hsv).astype(np.uint8) * 255
# Remove isolated natural-color speckles, then expand just enough to cover antialiasing.
open_kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, open_kernel)
if dilate > 0:
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2 * dilate + 1, 2 * dilate + 1))
mask = cv2.dilate(mask, kernel, iterations=1)
return mask
def process_video(
input_path: Path,
output_path: Path,
temp_video_path: Path,
*,
seconds: float | None,
mask_preview: bool,
method: str,
radius: float,
dilate: int,
) -> None:
cap = cv2.VideoCapture(str(input_path))
if not cap.isOpened():
raise RuntimeError(f"Could not open input video: {input_path}")
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_limit = total_frames if seconds is None else min(total_frames, int(round(seconds * fps)))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(str(temp_video_path), fourcc, fps, (width, height))
if not writer.isOpened():
raise RuntimeError(f"Could not open temporary output video: {temp_video_path}")
frame_index = 0
while frame_index < frame_limit:
ok, frame = cap.read()
if not ok:
break
mask = build_mask(frame, dilate=dilate)
if mask_preview:
rendered = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
else:
inpaint_method = cv2.INPAINT_NS if method == "ns" else cv2.INPAINT_TELEA
rendered = cv2.inpaint(frame, mask, radius, inpaint_method)
writer.write(rendered)
frame_index += 1
if frame_index % int(max(fps * 10, 1)) == 0:
print(f"processed {frame_index}/{frame_limit} frames", flush=True)
writer.release()
cap.release()
def mux_audio(input_path: Path, temp_video_path: Path, output_path: Path, seconds: float | None) -> None:
cmd = [
"ffmpeg",
"-hide_banner",
"-y",
"-i",
str(temp_video_path),
"-i",
str(input_path),
"-map",
"0:v:0",
"-map",
"1:a?",
"-c:v",
"libx264",
"-crf",
"18",
"-preset",
"medium",
"-c:a",
"copy",
]
if seconds is not None:
cmd.extend(["-t", str(seconds)])
cmd.append(str(output_path))
subprocess.run(cmd, check=True)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("input", type=Path)
parser.add_argument("output", type=Path)
parser.add_argument("--seconds", type=float, default=None)
parser.add_argument("--mask-preview", action="store_true")
parser.add_argument("--method", choices=["telea", "ns"], default="telea")
parser.add_argument("--radius", type=float, default=3.0)
parser.add_argument("--dilate", type=int, default=1)
args = parser.parse_args()
args.output.parent.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory(prefix="osd_inpaint_") as temp_dir:
temp_video_path = Path(temp_dir) / "video_no_audio.mp4"
process_video(
args.input,
args.output,
temp_video_path,
seconds=args.seconds,
mask_preview=args.mask_preview,
method=args.method,
radius=args.radius,
dilate=args.dilate,
)
mux_audio(args.input, temp_video_path, args.output, args.seconds)
if __name__ == "__main__":
main()