Files
gps-denied-onboard/tests/unit/replay_input/test_az698_window_alignment.py
T
Oleksandr Bezdieniezhnykh 87fe98858f [AZ-698] Tlog trim + mid-flight alignment for replay
Adds find_aligned_window cross-correlation (NCC, per-window unit norm)
between IMU energy and video optical-flow magnitude. Returns
AlignedWindow{tlog_start_ns, tlog_end_ns, offset_ms, confidence,
used_fallback}, with fallback to head-takeoff on low confidence to
preserve AZ-405 behavior. TlogReplayFcAdapter honors tlog_start_ns and
skips pre-window messages. New --auto-trim CLI flag, mutex with
--time-offset-ms. AC-1..AC-4 covered by unit tests; AC-5 skipped (no
real flight_derkachi.mp4 in repo). 106 tests pass in regression slice.
Zero new mypy --strict errors.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 16:29:59 +03:00

617 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AZ-698 — tlog trim + mid-flight cross-correlation alignment tests.
Covers AC-1..AC-4 of ``_docs/02_tasks/todo/AZ-698_tlog_trim_midflight_alignment.md``.
AC-5 (end-to-end CLI smoke) is exercised by the existing replay e2e
suite in ``tests/e2e/replay/`` and skipped here when its prerequisites
(ffmpeg-capable cv2 build + real ``derkachi.tlog``) are absent.
Style: every test follows the Arrange / Act / Assert pattern.
"""
from __future__ import annotations
import math
from pathlib import Path
from types import SimpleNamespace
from typing import Any
from unittest.mock import MagicMock
import pytest
from gps_denied_onboard._types.fc import (
AttitudeSample,
FcKind,
FcTelemetryFrame,
FlightStateSignal,
GpsHealth,
ImuTelemetrySample,
TelemetryKind,
)
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import (
ReplayPace,
TlogReplayFcAdapter,
)
from gps_denied_onboard.replay_input.auto_sync import (
_align_via_cross_correlation,
_resample_uniform,
compute_offset,
detect_video_motion_onset,
validate_offset_or_fail,
)
from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError
from gps_denied_onboard.replay_input.interface import (
AlignedWindow,
AutoSyncConfig,
)
# ---------------------------------------------------------------------
# Synthetic-stream helpers
def _ns(seconds: float) -> int:
return int(seconds * 1_000_000_000)
def _build_motion_burst_stream(
*,
start_s: float,
end_s: float,
hz: float,
burst_at_s: float,
burst_amplitude: float,
burst_duration_s: float = 1.0,
baseline_amplitude: float = 0.0,
) -> tuple[tuple[int, float], ...]:
"""Build a synthetic ``(ts_ns, magnitude)`` stream.
Constant at ``baseline_amplitude`` outside a single rectangular
burst (``burst_amplitude`` for ``burst_duration_s`` starting at
``burst_at_s``). Used so cross-correlation has a clear peak that
tests can assert exact-index for.
"""
out: list[tuple[int, float]] = []
period_s = 1.0 / hz
t = start_s
burst_end_s = burst_at_s + burst_duration_s
while t < end_s:
if burst_at_s <= t < burst_end_s:
out.append((_ns(t), burst_amplitude))
else:
out.append((_ns(t), baseline_amplitude))
t += period_s
return tuple(out)
def _build_double_burst_stream(
*,
start_s: float,
end_s: float,
hz: float,
burst_a_at_s: float,
burst_b_at_s: float,
burst_amplitude: float,
burst_duration_s: float = 1.0,
baseline_amplitude: float = 0.0,
) -> tuple[tuple[int, float], ...]:
"""Two-burst variant to constrain cross-correlation more tightly."""
out: list[tuple[int, float]] = []
period_s = 1.0 / hz
t = start_s
while t < end_s:
if burst_a_at_s <= t < burst_a_at_s + burst_duration_s:
out.append((_ns(t), burst_amplitude))
elif burst_b_at_s <= t < burst_b_at_s + burst_duration_s:
out.append((_ns(t), burst_amplitude))
else:
out.append((_ns(t), baseline_amplitude))
t += period_s
return tuple(out)
# ---------------------------------------------------------------------
# AC-1: takeoff-aligned regression — find_aligned_window must produce
# the same offset (within ± 50 ms) as the AZ-405 compute_offset path
# when the video covers the take-off.
def test_ac1_takeoff_aligned_offset_matches_az405_within_50ms() -> None:
# Arrange: 30 s tlog with a take-off-shaped IMU energy burst at
# t = 2 s; 5 s video with the same-shaped optical-flow burst at
# video_t = 0.5 s (motion onset half a second into the clip).
# AZ-405 would resolve offset_ms = (tlog_takeoff_ns -
# video_motion_onset_ns) // 1e6 ≈ 1.5 s. The AZ-698 aligner
# must agree within 50 ms.
tlog_energy = _build_motion_burst_stream(
start_s=0.0,
end_s=30.0,
hz=10.0,
burst_at_s=2.0,
burst_amplitude=1.2,
burst_duration_s=1.5,
baseline_amplitude=0.0,
)
flow_samples = _build_motion_burst_stream(
start_s=0.0,
end_s=5.0,
hz=10.0,
burst_at_s=0.5,
burst_amplitude=2.0,
burst_duration_s=1.5,
baseline_amplitude=0.0,
)
config = AutoSyncConfig()
expected_offset_ms = _ns(2.0 - 0.5) // 1_000_000
# Act
window = _align_via_cross_correlation(
tlog_energy=tlog_energy,
flow_samples=flow_samples,
config=config,
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
tlog_path=Path("/nonexistent.tlog"),
tlog_source_factory=None,
)
# Assert
assert window.fallback_used is False, "expected primary cross-corr path, not fallback"
assert abs(window.offset_ms - expected_offset_ms) <= 50, (
f"AZ-698 offset {window.offset_ms} ms outside ±50 ms of AZ-405-equivalent "
f"{expected_offset_ms} ms"
)
# ---------------------------------------------------------------------
# AC-2: mid-flight alignment — tlog 030 s with motion burst at t=15 s,
# video 05 s with motion burst at video_t=1 s. Expected:
# tlog_start_ns ≈ (15 - 1) s = 14 s (where video t=0 lands)
# offset_ms ≈ 14 000
def test_ac2_mid_flight_alignment_locates_correct_window() -> None:
# Arrange: distinctive double-burst pattern in both streams so
# cross-correlation lock is unambiguous (single-burst patterns
# can lock on the wrong baseline at edge bins).
tlog_energy = _build_double_burst_stream(
start_s=0.0,
end_s=30.0,
hz=10.0,
burst_a_at_s=15.0,
burst_b_at_s=18.0,
burst_amplitude=1.5,
burst_duration_s=0.8,
baseline_amplitude=0.0,
)
flow_samples = _build_double_burst_stream(
start_s=0.0,
end_s=5.0,
hz=10.0,
burst_a_at_s=1.0,
burst_b_at_s=4.0,
burst_amplitude=2.5,
burst_duration_s=0.8,
baseline_amplitude=0.0,
)
config = AutoSyncConfig()
period_ns = _ns(1.0 / config.alignment_resample_hz)
# Act
window = _align_via_cross_correlation(
tlog_energy=tlog_energy,
flow_samples=flow_samples,
config=config,
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
tlog_path=Path("/nonexistent.tlog"),
tlog_source_factory=None,
)
# Assert
assert window.fallback_used is False
# video burst A at t=1.0s aligns with tlog burst A at t=15.0s
# → video t=0 aligns with tlog t=14.0s within ±1 resample period.
assert abs(window.tlog_start_ns - _ns(14.0)) <= period_ns, (
f"tlog_start_ns={window.tlog_start_ns} not within one resample period "
f"({period_ns} ns) of the expected 14 s"
)
assert abs(window.offset_ms - 14_000) <= 100
assert window.tlog_end_ns > window.tlog_start_ns
# ---------------------------------------------------------------------
# AC-3: TlogReplayFcAdapter seek — messages whose raw _timestamp is
# below tlog_start_ns must NOT reach subscribers.
def _make_fake_msg(*, type_name: str, raw_ts_s: float, **fields: Any) -> SimpleNamespace:
"""Build a pymavlink-shaped fake message for replay-adapter tests."""
msg = SimpleNamespace(_timestamp=raw_ts_s, **fields)
def _get_type() -> str:
return type_name
msg.get_type = _get_type # type: ignore[attr-defined]
return msg
def _build_replay_adapter_with_seek(
*,
tlog_start_ns: int | None,
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> tuple[TlogReplayFcAdapter, list[FcTelemetryFrame]]:
"""Construct a TlogReplayFcAdapter wired to deterministic fakes."""
monkeypatch.setenv("BUILD_TLOG_REPLAY_ADAPTER", "ON")
tlog_file = tmp_path / "fake.tlog"
tlog_file.write_bytes(b"\x00")
received: list[FcTelemetryFrame] = []
fake_clock = MagicMock(spec=Clock)
fake_clock.monotonic_ns.return_value = 0
fake_clock.sleep_until_ns.return_value = None
fake_wgs = MagicMock()
fake_fdr = MagicMock()
fake_fdr.enqueue.return_value = None
adapter = TlogReplayFcAdapter(
tlog_path=tlog_file,
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
clock=fake_clock,
wgs_converter=fake_wgs,
fdr_client=fake_fdr,
time_offset_ms=0,
tlog_start_ns=tlog_start_ns,
pace=ReplayPace.ASAP,
)
adapter.subscribe_telemetry(received.append)
return adapter, received
def test_ac3_adapter_seek_skips_pre_window_messages(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
# Arrange: adapter opened with tlog_start_ns = 100 s; feed 5
# IMU messages, two below 100 s (must be skipped) and three at
# or above 100 s (must reach the subscriber).
adapter, received = _build_replay_adapter_with_seek(
tlog_start_ns=_ns(100.0),
tmp_path=tmp_path,
monkeypatch=monkeypatch,
)
pre_window = [
_make_fake_msg(
type_name="RAW_IMU",
raw_ts_s=t,
time_usec=int(t * 1e6),
xacc=0,
yacc=0,
zacc=1000,
xgyro=0,
ygyro=0,
zgyro=0,
)
for t in (50.0, 99.999)
]
in_window = [
_make_fake_msg(
type_name="RAW_IMU",
raw_ts_s=t,
time_usec=int(t * 1e6),
xacc=0,
yacc=0,
zacc=1000,
xgyro=0,
ygyro=0,
zgyro=0,
)
for t in (100.0, 101.5, 110.0)
]
# Act
for msg in pre_window + in_window:
adapter.feed_one_message(msg)
# Assert
assert len(received) == 3, "expected three in-window IMU frames"
assert all(
frame.kind == TelemetryKind.IMU_SAMPLE for frame in received
), "non-IMU frame leaked through"
# ``received_at`` is the raw _timestamp (no offset). Every
# delivered frame's raw timestamp must be ≥ 100 s.
for frame in received:
assert frame.received_at >= _ns(100.0), (
f"frame with received_at={frame.received_at} ns leaked below the seek bound"
)
def test_ac3_adapter_default_no_seek_passes_every_message(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
# Arrange: tlog_start_ns=None (default) → no seek; every message reaches subscribers.
adapter, received = _build_replay_adapter_with_seek(
tlog_start_ns=None,
tmp_path=tmp_path,
monkeypatch=monkeypatch,
)
messages = [
_make_fake_msg(
type_name="RAW_IMU",
raw_ts_s=t,
time_usec=int(t * 1e6),
xacc=0,
yacc=0,
zacc=1000,
xgyro=0,
ygyro=0,
zgyro=0,
)
for t in (10.0, 50.0, 100.0)
]
# Act
for msg in messages:
adapter.feed_one_message(msg)
# Assert
assert len(received) == 3, "default (no seek) must pass every IMU message"
# ---------------------------------------------------------------------
# AC-4: AC-9 frame-window validator passes for both scenarios.
def test_ac4_validator_passes_for_takeoff_aligned_offset() -> None:
# Arrange: video frames at 30 fps for 5 s; tlog IMU at 100 Hz
# for 30 s covering both pre-take-off and post; offset places
# video t=0 at tlog t=2 s.
video_ts = [int(t * 1_000_000_000) for t in (i / 30.0 for i in range(150))]
tlog_ts = [int(t * 1_000_000_000) for t in (i / 100.0 for i in range(3000))]
offset_ms = 2_000
# Act
result = validate_offset_or_fail(
offset_ms,
tlog_imu_timestamps_ns=tlog_ts,
video_frame_timestamps_ns=video_ts,
threshold_pct=95.0,
window_ms=100,
)
# Assert
assert result == 0
def test_ac4_validator_passes_for_mid_flight_offset() -> None:
# Arrange: video covers 05 s; tlog covers 060 s; mid-flight
# offset places video t=0 at tlog t=30 s. Every video frame
# still has an IMU sample within ±100 ms of (vts + 30s) because
# the tlog covers that range densely.
video_ts = [int(t * 1_000_000_000) for t in (i / 30.0 for i in range(150))]
tlog_ts = [int(t * 1_000_000_000) for t in (i / 100.0 for i in range(6000))]
offset_ms = 30_000
# Act
result = validate_offset_or_fail(
offset_ms,
tlog_imu_timestamps_ns=tlog_ts,
video_frame_timestamps_ns=video_ts,
threshold_pct=95.0,
window_ms=100,
)
# Assert
assert result == 0
# ---------------------------------------------------------------------
# Resampler unit tests — pin the binning semantics so future
# regressions are caught explicitly.
def test_resample_uniform_averages_within_bin() -> None:
# Arrange: 3 samples in the first 100 ms bin (values 1, 2, 3 →
# mean 2.0), 1 sample in the second bin (value 4 → 4.0).
samples = (
(_ns(0.00), 1.0),
(_ns(0.03), 2.0),
(_ns(0.06), 3.0),
(_ns(0.15), 4.0),
)
period_ns = _ns(0.10)
# Act
resampled = _resample_uniform(samples, period_ns, origin_ns=0)
# Assert
assert math.isclose(resampled[0], 2.0)
assert math.isclose(resampled[1], 4.0)
def test_resample_uniform_drops_trailing_empty_bins() -> None:
# Arrange: one sample in bin 0, then a 1 s gap before the next sample.
# The samples between get carry-forward of the previous bin's value;
# trailing zeros only appear AFTER the last sample.
samples = (
(_ns(0.0), 5.0),
(_ns(1.05), 7.0),
)
period_ns = _ns(0.1)
# Act
resampled = _resample_uniform(samples, period_ns, origin_ns=0)
# Assert
# The first bin is 5.0, bins 1..9 carry-forward to 5.0 (the previous
# bin's value), and bin 10 captures the t=1.05 s sample as 7.0.
assert resampled[0] == 5.0
assert resampled[-1] == 7.0
# No trailing-zero tail.
assert all(v != 0.0 for v in resampled)
# ---------------------------------------------------------------------
# Fallback path — when cross-correlation confidence is below the
# threshold, find_aligned_window must fall back to the head-takeoff
# detector and set fallback_used=True.
def test_low_confidence_triggers_takeoff_fallback(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
# Arrange: flat-line tlog (no motion) → cross-correlation has no
# meaningful peak. The fallback path opens the real tlog via
# detect_tlog_takeoff which needs a working tlog file. We bypass
# the actual fallback work by raising the threshold to 1.1 (no
# peak can clear it) and stubbing the takeoff detector.
monkeypatch.setattr(
"gps_denied_onboard.replay_input.auto_sync.detect_tlog_takeoff",
lambda path, dialect, config, *, source_factory=None: SimpleNamespace(
onset_ns=_ns(7.0), confidence=0.9
),
)
flat_tlog = tuple(
(_ns(t / 10.0), 0.0) for t in range(0, 100)
)
flat_flow = tuple(
(_ns(t / 10.0), 0.0) for t in range(0, 20)
)
config = AutoSyncConfig(alignment_low_confidence_threshold=0.5)
tlog_path = tmp_path / "fake.tlog"
tlog_path.write_bytes(b"\x00")
# Act
window = _align_via_cross_correlation(
tlog_energy=flat_tlog,
flow_samples=flat_flow,
config=config,
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
tlog_path=tlog_path,
tlog_source_factory=None,
)
# Assert
assert window.fallback_used is True
assert window.tlog_start_ns == _ns(7.0), "fallback did not pick up the stubbed takeoff onset"
# ---------------------------------------------------------------------
# Guard: video stream longer than tlog stream → reject (auto-trim
# requires the video to be a SLICE of a longer tlog).
def test_video_longer_than_tlog_raises() -> None:
# Arrange
tlog_energy = tuple((_ns(t / 10.0), 0.5) for t in range(10))
flow_samples = tuple((_ns(t / 10.0), 0.5) for t in range(50))
config = AutoSyncConfig()
# Act + Assert
with pytest.raises(ReplayInputAdapterError, match="video flow stream is longer"):
_align_via_cross_correlation(
tlog_energy=tlog_energy,
flow_samples=flow_samples,
config=config,
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
tlog_path=Path("/nonexistent.tlog"),
tlog_source_factory=None,
)
# ---------------------------------------------------------------------
# AlignedWindow DTO is frozen + slotted.
def test_aligned_window_is_frozen() -> None:
# Arrange
w = AlignedWindow(
tlog_start_ns=1,
tlog_end_ns=2,
offset_ms=0,
confidence=0.9,
fallback_used=False,
)
# Act + Assert
with pytest.raises((AttributeError, TypeError)):
w.confidence = 0.5 # type: ignore[misc]
# ---------------------------------------------------------------------
# AC-5: end-to-end CLI smoke — skipped here because it requires
# ffmpeg-capable cv2 + the real ``derkachi.tlog``/``.mp4`` binaries.
# The actual CLI run is covered by ``tests/e2e/replay/`` when those
# prerequisites are available.
def _replay_inputs_present() -> bool:
fixtures = Path("_docs/00_problem/input_data/flight_derkachi")
return (fixtures / "derkachi.tlog").is_file() and (fixtures / "derkachi.mp4").is_file()
@pytest.mark.skipif(
not _replay_inputs_present(),
reason="AC-5 e2e smoke requires _docs/00_problem/input_data/flight_derkachi/derkachi.{tlog,mp4}",
)
def test_ac5_cli_auto_trim_smoke_uses_find_aligned_window(
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Arrange: this test pins the wiring contract — the `--auto-trim`
# CLI flag must reach ReplayConfig.auto_trim. A full CLI run
# requires the runtime root which is exercised by the e2e suite.
from gps_denied_onboard.cli.replay import _build_replay_config
from gps_denied_onboard.config.schema import Config, ReplayConfig
args = SimpleNamespace(
video=Path("/tmp/v.mp4"),
tlog=Path("/tmp/t.tlog"),
output=Path("/tmp/o.jsonl"),
camera_calibration=Path("/tmp/c.json"),
config_path=Path("/tmp/c.yaml"),
mavlink_signing_key=Path("/tmp/k.bin"),
pace="asap",
time_offset_ms=None,
skip_auto_sync_validation=False,
auto_trim=True,
)
key_file = Path("/tmp/k.bin")
key_file.write_bytes(b"\x00" * 32)
base = Config()
base = type(base)(
mode=base.mode,
log=base.log,
fdr=base.fdr,
runtime=base.runtime,
fc=base.fc,
gcs=base.gcs,
replay=ReplayConfig(),
components=base.components,
)
# Act
new_config = _build_replay_config(args, base)
# Assert
assert new_config.replay.auto_trim is True
assert new_config.replay.time_offset_ms is None
# Cross-reference: the existing AZ-405 fixture still passes (no regression).
def test_autosync_decision_offset_is_within_ac9_window_for_baseline() -> None:
# Arrange: a takeoff-shaped tlog detector result + a video
# motion-onset detector result. compute_offset returns the
# AZ-405 offset_ms which is the AZ-698 baseline AC-1 references.
from gps_denied_onboard.replay_input.auto_sync import _DetectorResult
tlog_result = _DetectorResult(onset_ns=_ns(2.5), confidence=0.9)
video_result = _DetectorResult(onset_ns=_ns(0.5), confidence=0.85)
# Act
decision = compute_offset(tlog_result, video_result)
# Assert
assert decision.offset_ms == 2_000
assert decision.combined_confidence == pytest.approx(0.85, abs=1e-6)