"""``ReplayInputAdapter`` (AZ-405 / E-DEMO-REPLAY). Layer-4 cross-cutting coordinator that converges ``(video, tlog)`` inputs into the standard :class:`FrameSource`, :class:`FcAdapter`, and :class:`Clock` surfaces consumed by the airborne composition root. Owns the time-alignment concern: either the operator's manual ``--time-offset-ms`` override or the AZ-405 IMU-take-off auto-detect. ``open()`` performs strict ordering so AC-13 holds: 1. **Tlog message-type pre-validation** runs FIRST so a tlog missing ``RAW_IMU`` / ``ATTITUDE`` raises before the video is ever read. 2. If the constructor received ``manual_time_offset_ms is None``, the auto-sync detectors run; otherwise the manual offset is adopted directly (AC-8 verifies the bypass). 3. The resolved offset is fed through the AC-9 frame-window match validator; a hard-fail raises ``"auto-sync hard-fail: …"`` so the shared main maps it to CLI exit code 2 (AC-7). 4. The :class:`Clock` strategy is constructed (``TlogDerivedClock`` for ``pace=ASAP``, ``WallClock`` for ``pace=REALTIME``) — the single instance the bundle ships to the composition root (Invariant 2; AC-5). 5. :class:`VideoFileFrameSource` and :class:`TlogReplayFcAdapter` are constructed against the offset + clock + dialect; the FC adapter's own ``open()`` triggers its independent pre-scan (a second sanity check; the operator gets the original error path if step 1 was bypassed via a test fake). 6. The bundle is returned with ``auto_sync_result`` populated for the auto path and ``None`` for the manual path. The coordinator is idempotent on ``close()`` — repeated calls are no-ops once the underlying strategies have been released (AC-12). """ from __future__ import annotations import logging from pathlib import Path from typing import TYPE_CHECKING, Any from gps_denied_onboard._types.fc import FcKind from gps_denied_onboard.clock.tlog_derived import TlogDerivedClock from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard.components.c8_fc_adapter.errors import ( FcAdapterConfigError, FcAdapterError, FcOpenError, ) from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import ( ReplayPace, TlogReplayFcAdapter, ) from gps_denied_onboard.fdr_client.records import FdrRecord from gps_denied_onboard.frame_source.errors import ( FrameSourceConfigError, FrameSourceError, ) from gps_denied_onboard.frame_source.video_file import VideoFileFrameSource from gps_denied_onboard.helpers.iso_timestamps import iso_ts_now from gps_denied_onboard.replay_input.auto_sync import ( _load_tlog_samples, compute_offset, detect_video_motion_onset, find_aligned_window, validate_offset_or_fail, ) from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError from gps_denied_onboard.replay_input.interface import ( AlignedWindow, AutoSyncConfig, AutoSyncDecision, ReplayInputBundle, ) if TYPE_CHECKING: from gps_denied_onboard._types.calibration import CameraCalibration from gps_denied_onboard.clock import Clock from gps_denied_onboard.fdr_client.client import FdrClient from gps_denied_onboard.helpers.wgs_converter import WgsConverter __all__ = ["ReplayInputAdapter"] _FDR_PRODUCER_ID = "replay_input.tlog_video_adapter" _LOG_KIND_AUTO_SYNC_DETECTED = "replay.auto_sync.detected" _LOG_KIND_AUTO_SYNC_LOW_CONF = "replay.auto_sync.low_confidence" _LOG_KIND_AUTO_SYNC_AC8_FAIL = "replay.auto_sync.ac8_validation_failed" _LOG_KIND_OPEN_MANUAL = "replay.input.opened_manual_offset" _LOG_KIND_AUTO_TRIM_RESOLVED = "replay.auto_trim.resolved" _LOG_KIND_AUTO_TRIM_FALLBACK = "replay.auto_trim.fallback_to_takeoff" class ReplayInputAdapter: """Coordinator that converges ``(video, tlog)`` into the airborne strategies. Constructor parameters: - ``video_path`` / ``tlog_path`` — filesystem inputs. - ``camera_calibration`` — :class:`CameraCalibration` used to derive the calibration ID propagated into every emitted :class:`NavCameraFrame`. - ``target_fc_dialect`` — ``ARDUPILOT_PLANE`` or ``INAV``; passed through to :class:`TlogReplayFcAdapter`. - ``wgs_converter`` — shared geodesy helper, constructor-injected into :class:`TlogReplayFcAdapter`. - ``fdr_client`` — FDR sink for the TlogReplayFcAdapter and for the coordinator's own structured-event mirror. - ``pace`` — :class:`ReplayPace` (``ASAP`` or ``REALTIME``). - ``manual_time_offset_ms`` — ``None`` triggers auto-sync; an integer bypasses auto-sync DETECTION but the AC-9 frame-window validator still runs on the resolved offset (AC-8). - ``skip_auto_sync_validation`` — when ``True``, ALSO skip the AC-9 validator. Only legal in combination with a non-``None`` ``manual_time_offset_ms`` (the coordinator refuses both-None to avoid silent-zero offset bugs). Intended for fixtures where neither the IMU take-off detector nor the video motion-onset detector can produce a reliable signal (mid-flight clips, stationary still-image scenarios — see AZ-611). Default ``False``. - ``auto_sync_config`` — :class:`AutoSyncConfig` thresholds. Behaviour: - :meth:`open` resolves the offset, validates AC-9, and returns a :class:`ReplayInputBundle` with the wired strategies. Raises :class:`ReplayInputAdapterError` on every coordinator-scope failure so the shared main can map cleanly to CLI exit code 2. - :meth:`close` releases the FC adapter and the frame source; idempotent (AC-12). """ __slots__ = ( "_video_path", "_tlog_path", "_camera_calibration", "_target_fc_dialect", "_wgs_converter", "_fdr_client", "_pace", "_manual_time_offset_ms", "_skip_auto_sync_validation", "_auto_trim", "_auto_sync_config", "_tlog_source_factory", "_video_frames_factory", "_video_timestamps_factory", "_mavlink_transport", "_log", "_opened", "_closed", "_bundle", ) def __init__( self, *, video_path: Path, tlog_path: Path, camera_calibration: "CameraCalibration", target_fc_dialect: FcKind, wgs_converter: "WgsConverter", fdr_client: "FdrClient", pace: ReplayPace, manual_time_offset_ms: int | None, auto_sync_config: AutoSyncConfig, skip_auto_sync_validation: bool = False, auto_trim: bool = False, tlog_source_factory: Any | None = None, video_frames_factory: Any | None = None, video_timestamps_factory: Any | None = None, mavlink_transport: Any | None = None, ) -> None: if not isinstance(video_path, Path): raise ReplayInputAdapterError( f"video_path must be a pathlib.Path; got {type(video_path).__name__}" ) if not isinstance(tlog_path, Path): raise ReplayInputAdapterError( f"tlog_path must be a pathlib.Path; got {type(tlog_path).__name__}" ) if target_fc_dialect not in (FcKind.ARDUPILOT_PLANE, FcKind.INAV): raise ReplayInputAdapterError( f"target_fc_dialect must be ARDUPILOT_PLANE or INAV; " f"got {target_fc_dialect!r}" ) if not isinstance(pace, ReplayPace): raise ReplayInputAdapterError( f"pace must be a ReplayPace enum; got {type(pace).__name__}" ) if not isinstance(skip_auto_sync_validation, bool): raise ReplayInputAdapterError( "skip_auto_sync_validation must be a bool; got " f"{type(skip_auto_sync_validation).__name__}" ) if skip_auto_sync_validation and manual_time_offset_ms is None: # Mirror the ReplayConfig.__post_init__ gate. Without a # manual offset there is no operator-acknowledged value # to skip validation against — auto-sync would compute # an offset of unknown quality and the validator that # would catch a bad detection is disabled. Refuse so # this can't silently mask a wrong offset. raise ReplayInputAdapterError( "skip_auto_sync_validation=True requires " "manual_time_offset_ms to be set" ) if not isinstance(auto_trim, bool): raise ReplayInputAdapterError( "auto_trim must be a bool; got " f"{type(auto_trim).__name__}" ) if auto_trim and manual_time_offset_ms is not None: # Mirror the ReplayConfig.__post_init__ gate. An explicit # manual offset means the operator has already aligned # the streams; running the cross-correlation aligner on # top of that would either re-resolve the same window # (wasteful) or overwrite the operator's intent silently. raise ReplayInputAdapterError( "auto_trim=True is mutually exclusive with " "manual_time_offset_ms" ) self._video_path = video_path self._tlog_path = tlog_path self._camera_calibration = camera_calibration self._target_fc_dialect = target_fc_dialect self._wgs_converter = wgs_converter self._fdr_client = fdr_client self._pace = pace self._manual_time_offset_ms = manual_time_offset_ms self._skip_auto_sync_validation = skip_auto_sync_validation self._auto_trim = auto_trim self._auto_sync_config = auto_sync_config self._tlog_source_factory = tlog_source_factory self._video_frames_factory = video_frames_factory self._video_timestamps_factory = video_timestamps_factory self._mavlink_transport = mavlink_transport self._log = logging.getLogger("replay_input.tlog_video_adapter") self._opened = False self._closed = False self._bundle: ReplayInputBundle | None = None def open(self) -> ReplayInputBundle: """Resolve the offset, build the strategies, return the bundle. Idempotent only in the failure-then-retry sense — calling ``open()`` twice without an intervening ``close()`` raises :class:`ReplayInputAdapterError`. """ if self._opened: raise ReplayInputAdapterError("ReplayInputAdapter already opened") # Step 1 — tlog presence + required-message check (R-DEMO-3, # AC-13). Runs BEFORE any video read so a malformed tlog # surfaces without paying the cv2.VideoCapture cost. tlog_imu_timestamps_ns, tlog_samples_for_auto = self._load_and_validate_tlog() # Step 2 — resolve the offset (auto-sync, auto-trim, or # manual override). decision: AutoSyncDecision | None aligned_window: AlignedWindow | None if self._auto_trim: aligned_window = self._run_auto_trim() decision = None resolved_offset_ms = aligned_window.offset_ms # The prescan timestamps (step 1) only cover the tlog head. # When the auto-trim window is far into the tlog, the prescan # timestamps fall outside the window and the AC-9 validator # would always return 0 % match → false hard-fail. Reload # IMU timestamps from the discovered window so the validator # sees the correct slice. if aligned_window.tlog_start_ns > 0: tlog_imu_timestamps_ns = self._load_tlog_imu_in_window( aligned_window.tlog_start_ns, aligned_window.tlog_end_ns, ) self._log.info( "replay_input.ac9_window_reload: " "tlog_start_ns=%d tlog_end_ns=%d loaded=%d imu_samples", aligned_window.tlog_start_ns, aligned_window.tlog_end_ns, len(tlog_imu_timestamps_ns), extra={ "kind": "replay_input.ac9_window_reload", "kv": { "tlog_start_ns": aligned_window.tlog_start_ns, "tlog_end_ns": aligned_window.tlog_end_ns, "loaded_imu_count": len(tlog_imu_timestamps_ns), }, }, ) elif self._manual_time_offset_ms is None: aligned_window = None decision = self._run_auto_sync(tlog_samples_for_auto) resolved_offset_ms = decision.offset_ms else: aligned_window = None decision = None resolved_offset_ms = int(self._manual_time_offset_ms) self._log.info( f"{_LOG_KIND_OPEN_MANUAL}: resolved_offset_ms={resolved_offset_ms}", extra={ "kind": _LOG_KIND_OPEN_MANUAL, "kv": {"resolved_offset_ms": resolved_offset_ms}, }, ) # Step 3 — load video frame timestamps and run AC-9 validator # unless the operator explicitly opted out via # skip_auto_sync_validation (AZ-611). The opt-out is meant for # mid-flight + stationary fixtures where neither detector can # produce a reliable signal; the constructor already enforced # that the opt-out requires a manual offset. video_frame_timestamps_ns = self._load_video_timestamps() if self._skip_auto_sync_validation: self._log.info( f"{_LOG_KIND_OPEN_MANUAL}: ac9_validator_skipped " f"(resolved_offset_ms={resolved_offset_ms})", extra={ "kind": _LOG_KIND_OPEN_MANUAL, "kv": { "resolved_offset_ms": resolved_offset_ms, "ac9_validator_skipped": True, }, }, ) else: result_code = validate_offset_or_fail( resolved_offset_ms, tlog_imu_timestamps_ns, video_frame_timestamps_ns, threshold_pct=self._auto_sync_config.match_threshold_pct, window_ms=self._auto_sync_config.match_window_ms, ) if result_code != 0: self._raise_ac8_fail( resolved_offset_ms, len(tlog_imu_timestamps_ns), len(video_frame_timestamps_ns), ) # Step 4 — clock strategy (single instance per Invariant 2). clock = self._build_clock() # Step 5 — concrete strategies. The frame source is built # first because its constructor verifies the build flag and # opens the cv2 capture handle — a failure here is a clean # config error (no resources held). The FC adapter is built # second; its open() launches the decode thread. try: frame_source = VideoFileFrameSource( path=self._video_path, camera_calibration_id=self._camera_calibration.camera_id, clock=clock, ) except FrameSourceConfigError as exc: raise ReplayInputAdapterError( f"video file unreadable / unsupported codec: {self._video_path} " f"({exc})" ) from exc except FrameSourceError as exc: raise ReplayInputAdapterError( f"video file decode error: {self._video_path} ({exc})" ) from exc try: fc_adapter = TlogReplayFcAdapter( tlog_path=self._tlog_path, target_fc_dialect=self._target_fc_dialect, clock=clock, wgs_converter=self._wgs_converter, fdr_client=self._fdr_client, time_offset_ms=resolved_offset_ms, tlog_start_ns=( aligned_window.tlog_start_ns if aligned_window is not None else None ), pace=self._pace, source_factory=self._tlog_source_factory, mavlink_transport=self._mavlink_transport, ) fc_adapter.open() except (FcOpenError, FcAdapterConfigError, FcAdapterError) as exc: # Release the already-built frame source so we do not # leak the cv2 handle when the FC adapter fails after # the video was opened. try: frame_source.close() except Exception: # pragma: no cover — defensive. self._log.debug( "ReplayInputAdapter: frame_source.close() during FC adapter rollback failed", exc_info=True, ) # Translate the FC error into the coordinator's single # public failure shape so the CLI exit-code mapping # remains single-source. Pre-scan failures naturally # surface the "tlog missing required messages: …" prefix # the contract mandates. raise ReplayInputAdapterError(str(exc)) from exc # Step 6 — assemble + record the bundle. bundle = ReplayInputBundle( frame_source=frame_source, fc_adapter=fc_adapter, clock=clock, resolved_time_offset_ms=resolved_offset_ms, auto_sync_result=decision, aligned_window=aligned_window, ) self._bundle = bundle self._opened = True return bundle def close(self) -> None: """Release the FC adapter + frame source; idempotent (AC-12).""" if self._closed: self._log.debug( "ReplayInputAdapter.close called twice; no-op" ) return self._closed = True bundle = self._bundle self._bundle = None if bundle is None: return try: bundle.fc_adapter.close() except Exception: # pragma: no cover — defensive. self._log.debug( "ReplayInputAdapter: fc_adapter.close() raised", exc_info=True ) try: bundle.frame_source.close() except Exception: # pragma: no cover — defensive. self._log.debug( "ReplayInputAdapter: frame_source.close() raised", exc_info=True ) # ------------------------------------------------------------------ # Internal helpers def _load_and_validate_tlog( self, ) -> tuple[list[int], Any]: """Load tlog IMU + ATTITUDE samples; raise on missing types. Returns the IMU-only timestamp list (used by the AC-9 validator) plus the full :class:`TlogSamples` so the auto- sync path can reuse the same scan for take-off detection. Raises :class:`ReplayInputAdapterError` for the R-DEMO-3 missing-types path; this is the AC-13 fail-fast surface. """ if not self._tlog_path.is_file(): raise ReplayInputAdapterError( f"tlog file not found: {self._tlog_path}" ) samples = _load_tlog_samples( self._tlog_path, self._auto_sync_config.prescan_max_messages, source_factory=self._tlog_source_factory, ) if not samples.accel: raise ReplayInputAdapterError( "tlog missing required message types: ['RAW_IMU', 'SCALED_IMU2']" ) if not samples.attitude: raise ReplayInputAdapterError( "tlog missing required message types: ['ATTITUDE']" ) return [ts for ts, _ in samples.accel], samples def _run_auto_trim(self) -> AlignedWindow: """AZ-698 auto-trim path — cross-correlate IMU energy ↔ optical flow. Returns the located :class:`AlignedWindow`. When the correlation peak falls below :attr:`AutoSyncConfig.alignment_low_confidence_threshold`, :func:`find_aligned_window` falls back to the AZ-405 head-takeoff detector and sets ``fallback_used=True`` — the coordinator logs WARN but still proceeds (the AC-9 frame-window validator runs in Step 3 and will hard-fail if the resolved offset is bad). """ window = find_aligned_window( self._tlog_path, self._video_path, self._auto_sync_config, self._target_fc_dialect, tlog_source_factory=self._tlog_source_factory, video_frames_factory=self._video_frames_factory, ) kind = ( _LOG_KIND_AUTO_TRIM_FALLBACK if window.fallback_used else _LOG_KIND_AUTO_TRIM_RESOLVED ) level = "WARN" if window.fallback_used else "INFO" kv = { "tlog_start_ns": window.tlog_start_ns, "tlog_end_ns": window.tlog_end_ns, "offset_ms": window.offset_ms, "confidence": window.confidence, "fallback_used": window.fallback_used, "flight_count_detected": window.flight_count_detected, "selected_flight_index": window.selected_flight_index, } msg = ( f"{kind}: tlog_start_ns={window.tlog_start_ns} " f"offset_ms={window.offset_ms} confidence={window.confidence:.3f} " f"flights_detected={window.flight_count_detected} " f"selected_flight={window.selected_flight_index}" ) if window.fallback_used: self._log.warning(msg, extra={"kind": kind, "kv": kv}) else: self._log.info(msg, extra={"kind": kind, "kv": kv}) self._emit_fdr_event(level=level, log_kind=kind, msg=msg, kv=kv) return window def _run_auto_sync(self, tlog_samples: Any) -> AutoSyncDecision: """Auto path — compute the take-off / motion-onset / offset. Re-uses the already-loaded ``tlog_samples`` for the take-off detector so the tlog is walked exactly once per ``open()`` regardless of which path runs. """ from gps_denied_onboard.replay_input.auto_sync import ( _compute_tlog_takeoff_from_samples, ) tlog_result = _compute_tlog_takeoff_from_samples( tlog_samples, self._auto_sync_config ) video_result = detect_video_motion_onset( self._video_path, self._auto_sync_config, frames_factory=self._video_frames_factory, ) decision = compute_offset(tlog_result, video_result) if decision.combined_confidence < self._auto_sync_config.low_confidence_threshold: self._log_decision( kind=_LOG_KIND_AUTO_SYNC_LOW_CONF, level="WARN", decision=decision, extra_kv={"proceeding_with_best_guess": True}, ) else: self._log_decision( kind=_LOG_KIND_AUTO_SYNC_DETECTED, level="INFO", decision=decision, extra_kv={}, ) return decision def _load_video_timestamps(self) -> list[int]: """Decode the leading video segment, return per-frame timestamps. Used by the AC-9 frame-window match validator and as a fallback when the auto-sync video scan was bypassed (manual path). Stops at ``video_motion_scan_seconds`` so wildly long clips do not hold up startup. """ if self._video_timestamps_factory is not None: return list(self._video_timestamps_factory(self._video_path)) try: import cv2 as _cv2 # type: ignore[import-not-found] except ImportError as exc: raise ReplayInputAdapterError( "opencv-python is required for replay auto-sync but is " "not importable in this binary" ) from exc capture = _cv2.VideoCapture(str(self._video_path)) if not capture.isOpened(): capture.release() raise ReplayInputAdapterError( f"video file unreadable / unsupported codec: {self._video_path}" ) out: list[int] = [] max_pos_ms = self._auto_sync_config.video_motion_scan_seconds * 1000.0 try: while True: ok = capture.grab() if not ok: break pos_ms = float(capture.get(_cv2.CAP_PROP_POS_MSEC)) if pos_ms > max_pos_ms: break out.append(int(pos_ms * 1_000_000)) finally: capture.release() return out def _load_tlog_imu_in_window( self, start_ns: int, end_ns: int, ) -> list[int]: """Load tlog IMU timestamps from [start_ns, end_ns]. Used by the AC-9 validator in auto-trim mode. The prescan (step 1) only covers the tlog head; when the identified window is later in the file this method re-scans to find IMU samples in the correct range. Sequential scan is unavoidable (pymavlink does not seek), but only IMU message types are matched so the scan is fast in practice. """ from gps_denied_onboard.replay_input.auto_sync import _open_tlog source = _open_tlog(self._tlog_path, source_factory=self._tlog_source_factory) timestamps: list[int] = [] try: while True: try: msg = source.recv_match( type=["RAW_IMU", "SCALED_IMU2"], blocking=False, ) except Exception as exc: raise ReplayInputAdapterError( f"tlog scan for AC-9 window failed: {exc!r}" ) from exc if msg is None: break raw = getattr(msg, "_timestamp", None) if raw is None: continue ts_ns = int(float(raw) * 1_000_000_000) if ts_ns < start_ns: continue if ts_ns > end_ns: break timestamps.append(ts_ns) finally: if hasattr(source, "close"): try: source.close() except Exception: pass return timestamps def _build_clock(self) -> "Clock": """Pick the :class:`Clock` strategy per pace; single instance. The ``TlogDerivedClock`` is constructed against an empty iterable here: the composition root (AZ-401) is responsible for hooking the clock's source up to the live tlog cursor once the FC adapter's decode thread starts streaming. The empty-source default keeps unit tests self-contained. """ if self._pace is ReplayPace.ASAP: return TlogDerivedClock(source=iter([])) return WallClock() def _log_decision( self, *, kind: str, level: str, decision: AutoSyncDecision, extra_kv: dict[str, Any], ) -> None: kv: dict[str, Any] = { "tlog_takeoff_ns": decision.tlog_takeoff_ns, "video_motion_onset_ns": decision.video_motion_onset_ns, "offset_ms": decision.offset_ms, "tlog_confidence": decision.tlog_confidence, "video_confidence": decision.video_confidence, "combined_confidence": decision.combined_confidence, } kv.update(extra_kv) msg = f"{kind}: offset_ms={decision.offset_ms} confidence={decision.combined_confidence:.3f}" if level == "WARN": self._log.warning(msg, extra={"kind": kind, "kv": kv}) else: self._log.info(msg, extra={"kind": kind, "kv": kv}) self._emit_fdr_event(level=level, log_kind=kind, msg=msg, kv=kv) def _raise_ac8_fail( self, offset_ms: int, imu_count: int, frame_count: int, ) -> None: kv = { "offset_ms": offset_ms, "frame_window_match_pct_threshold": self._auto_sync_config.match_threshold_pct, "imu_sample_count": imu_count, "video_frame_count": frame_count, } msg = ( f"auto-sync hard-fail: frame-window match below " f"{self._auto_sync_config.match_threshold_pct}% with " f"offset_ms={offset_ms}" ) self._log.error( f"{_LOG_KIND_AUTO_SYNC_AC8_FAIL}: {msg}", extra={"kind": _LOG_KIND_AUTO_SYNC_AC8_FAIL, "kv": kv}, ) self._emit_fdr_event( level="ERROR", log_kind=_LOG_KIND_AUTO_SYNC_AC8_FAIL, msg=msg, kv=kv ) raise ReplayInputAdapterError(msg) def _emit_fdr_event( self, *, level: str, log_kind: str, msg: str, kv: dict[str, Any], ) -> None: record = FdrRecord( schema_version=1, ts=iso_ts_now(), producer_id=_FDR_PRODUCER_ID, kind="log", payload={ "level": level, "component": "replay_input", "kind": log_kind, "msg": msg, "kv": kv, }, ) try: self._fdr_client.enqueue(record) except Exception as exc: self._log.debug( f"replay_input.fdr_enqueue_failed: {exc!r}", extra={ "kind": "replay_input.fdr_enqueue_failed", "kv": {"error": repr(exc), "downstream_kind": log_kind}, }, )