mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 16:31:14 +00:00
Update autodev state documentation to reflect progress in the Decompose phase, changing the current step from 5 to 6. Revise sub-step details to indicate a shift to phase 2, focusing on module layout for the Satellite Service and Tile Manager, and awaiting confirmation before product task decomposition. Additionally, enhance problem documentation to clarify the original still-image sample limitations and introduce the Derkachi representative fixture for improved data validation. Update references to the Tile Manager and Satellite Service throughout the documentation for consistency.
This commit is contained in:
@@ -0,0 +1,146 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Remove neon OSD strokes from a video using color masking and inpainting."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
|
||||
def build_mask(frame: np.ndarray, dilate: int) -> np.ndarray:
|
||||
"""Return a mask for bright green/cyan OSD strokes."""
|
||||
b, g, r = cv2.split(frame)
|
||||
|
||||
# The OSD strokes are neon-like: green/cyan channels are high and dominate red.
|
||||
green = (g > 95) & ((g.astype(np.int16) - r.astype(np.int16)) > 22) & (
|
||||
(g.astype(np.int16) - b.astype(np.int16)) > 0
|
||||
)
|
||||
cyan = (g > 105) & (b > 95) & ((g.astype(np.int16) - r.astype(np.int16)) > 28)
|
||||
|
||||
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
|
||||
hue = hsv[:, :, 0]
|
||||
sat = hsv[:, :, 1]
|
||||
val = hsv[:, :, 2]
|
||||
neon_hsv = (sat > 55) & (val > 85) & (hue >= 35) & (hue <= 105)
|
||||
|
||||
mask = ((green | cyan) & neon_hsv).astype(np.uint8) * 255
|
||||
|
||||
# Remove isolated natural-color speckles, then expand just enough to cover antialiasing.
|
||||
open_kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))
|
||||
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, open_kernel)
|
||||
if dilate > 0:
|
||||
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2 * dilate + 1, 2 * dilate + 1))
|
||||
mask = cv2.dilate(mask, kernel, iterations=1)
|
||||
return mask
|
||||
|
||||
|
||||
def process_video(
|
||||
input_path: Path,
|
||||
output_path: Path,
|
||||
temp_video_path: Path,
|
||||
*,
|
||||
seconds: float | None,
|
||||
mask_preview: bool,
|
||||
method: str,
|
||||
radius: float,
|
||||
dilate: int,
|
||||
) -> None:
|
||||
cap = cv2.VideoCapture(str(input_path))
|
||||
if not cap.isOpened():
|
||||
raise RuntimeError(f"Could not open input video: {input_path}")
|
||||
|
||||
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
|
||||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
frame_limit = total_frames if seconds is None else min(total_frames, int(round(seconds * fps)))
|
||||
|
||||
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
||||
writer = cv2.VideoWriter(str(temp_video_path), fourcc, fps, (width, height))
|
||||
if not writer.isOpened():
|
||||
raise RuntimeError(f"Could not open temporary output video: {temp_video_path}")
|
||||
|
||||
frame_index = 0
|
||||
while frame_index < frame_limit:
|
||||
ok, frame = cap.read()
|
||||
if not ok:
|
||||
break
|
||||
|
||||
mask = build_mask(frame, dilate=dilate)
|
||||
if mask_preview:
|
||||
rendered = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
|
||||
else:
|
||||
inpaint_method = cv2.INPAINT_NS if method == "ns" else cv2.INPAINT_TELEA
|
||||
rendered = cv2.inpaint(frame, mask, radius, inpaint_method)
|
||||
|
||||
writer.write(rendered)
|
||||
frame_index += 1
|
||||
if frame_index % int(max(fps * 10, 1)) == 0:
|
||||
print(f"processed {frame_index}/{frame_limit} frames", flush=True)
|
||||
|
||||
writer.release()
|
||||
cap.release()
|
||||
|
||||
|
||||
def mux_audio(input_path: Path, temp_video_path: Path, output_path: Path, seconds: float | None) -> None:
|
||||
cmd = [
|
||||
"ffmpeg",
|
||||
"-hide_banner",
|
||||
"-y",
|
||||
"-i",
|
||||
str(temp_video_path),
|
||||
"-i",
|
||||
str(input_path),
|
||||
"-map",
|
||||
"0:v:0",
|
||||
"-map",
|
||||
"1:a?",
|
||||
"-c:v",
|
||||
"libx264",
|
||||
"-crf",
|
||||
"18",
|
||||
"-preset",
|
||||
"medium",
|
||||
"-c:a",
|
||||
"copy",
|
||||
]
|
||||
if seconds is not None:
|
||||
cmd.extend(["-t", str(seconds)])
|
||||
cmd.append(str(output_path))
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("input", type=Path)
|
||||
parser.add_argument("output", type=Path)
|
||||
parser.add_argument("--seconds", type=float, default=None)
|
||||
parser.add_argument("--mask-preview", action="store_true")
|
||||
parser.add_argument("--method", choices=["telea", "ns"], default="telea")
|
||||
parser.add_argument("--radius", type=float, default=3.0)
|
||||
parser.add_argument("--dilate", type=int, default=1)
|
||||
args = parser.parse_args()
|
||||
|
||||
args.output.parent.mkdir(parents=True, exist_ok=True)
|
||||
with tempfile.TemporaryDirectory(prefix="osd_inpaint_") as temp_dir:
|
||||
temp_video_path = Path(temp_dir) / "video_no_audio.mp4"
|
||||
process_video(
|
||||
args.input,
|
||||
args.output,
|
||||
temp_video_path,
|
||||
seconds=args.seconds,
|
||||
mask_preview=args.mask_preview,
|
||||
method=args.method,
|
||||
radius=args.radius,
|
||||
dilate=args.dilate,
|
||||
)
|
||||
mux_audio(args.input, temp_video_path, args.output, args.seconds)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user