mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 21:21:13 +00:00
f5366bbca1
Real derkachi.tlog covers 3 takeoffs at the same field but the uploaded video covers only the last. Original NCC argmax + AZ-405 head-takeoff fallback both biased toward flight 1, violating the spec's "the last chunk in tlog is relevant" framing. Patch: pre-NCC flight segmenter partitions the IMU energy stream into distinct flights (threshold + gap walk); find_aligned_window restricts NCC search to the last segment; low-confidence fallback uses that segment's start instead of head-takeoff detection. AlignedWindow gains flight_count_detected + selected_flight_index for FDR-visible audit. 7 new unit tests (segmenter shapes + end-to-end multi-flight pipeline + segmented fallback path). 19 AZ-698 tests pass, 113 in the regression slice. Zero new mypy --strict errors. Co-authored-by: Cursor <cursoragent@cursor.com>
936 lines
30 KiB
Python
936 lines
30 KiB
Python
"""AZ-698 — tlog trim + mid-flight cross-correlation alignment tests.
|
||
|
||
Covers AC-1..AC-4 of ``_docs/02_tasks/todo/AZ-698_tlog_trim_midflight_alignment.md``.
|
||
AC-5 (end-to-end CLI smoke) is exercised by the existing replay e2e
|
||
suite in ``tests/e2e/replay/`` and skipped here when its prerequisites
|
||
(ffmpeg-capable cv2 build + real ``derkachi.tlog``) are absent.
|
||
|
||
Style: every test follows the Arrange / Act / Assert pattern.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import math
|
||
from pathlib import Path
|
||
from types import SimpleNamespace
|
||
from typing import Any
|
||
from unittest.mock import MagicMock
|
||
|
||
import pytest
|
||
|
||
from gps_denied_onboard._types.fc import (
|
||
AttitudeSample,
|
||
FcKind,
|
||
FcTelemetryFrame,
|
||
FlightStateSignal,
|
||
GpsHealth,
|
||
ImuTelemetrySample,
|
||
TelemetryKind,
|
||
)
|
||
from gps_denied_onboard.clock import Clock
|
||
from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import (
|
||
ReplayPace,
|
||
TlogReplayFcAdapter,
|
||
)
|
||
from gps_denied_onboard.replay_input.auto_sync import (
|
||
_align_via_cross_correlation,
|
||
_resample_uniform,
|
||
_segment_flights_from_imu_energy,
|
||
compute_offset,
|
||
detect_video_motion_onset,
|
||
find_aligned_window,
|
||
validate_offset_or_fail,
|
||
)
|
||
from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError
|
||
from gps_denied_onboard.replay_input.interface import (
|
||
AlignedWindow,
|
||
AutoSyncConfig,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------
|
||
# Synthetic-stream helpers
|
||
|
||
|
||
def _ns(seconds: float) -> int:
|
||
return int(seconds * 1_000_000_000)
|
||
|
||
|
||
def _build_motion_burst_stream(
|
||
*,
|
||
start_s: float,
|
||
end_s: float,
|
||
hz: float,
|
||
burst_at_s: float,
|
||
burst_amplitude: float,
|
||
burst_duration_s: float = 1.0,
|
||
baseline_amplitude: float = 0.0,
|
||
) -> tuple[tuple[int, float], ...]:
|
||
"""Build a synthetic ``(ts_ns, magnitude)`` stream.
|
||
|
||
Constant at ``baseline_amplitude`` outside a single rectangular
|
||
burst (``burst_amplitude`` for ``burst_duration_s`` starting at
|
||
``burst_at_s``). Used so cross-correlation has a clear peak that
|
||
tests can assert exact-index for.
|
||
"""
|
||
out: list[tuple[int, float]] = []
|
||
period_s = 1.0 / hz
|
||
t = start_s
|
||
burst_end_s = burst_at_s + burst_duration_s
|
||
while t < end_s:
|
||
if burst_at_s <= t < burst_end_s:
|
||
out.append((_ns(t), burst_amplitude))
|
||
else:
|
||
out.append((_ns(t), baseline_amplitude))
|
||
t += period_s
|
||
return tuple(out)
|
||
|
||
|
||
def _build_double_burst_stream(
|
||
*,
|
||
start_s: float,
|
||
end_s: float,
|
||
hz: float,
|
||
burst_a_at_s: float,
|
||
burst_b_at_s: float,
|
||
burst_amplitude: float,
|
||
burst_duration_s: float = 1.0,
|
||
baseline_amplitude: float = 0.0,
|
||
) -> tuple[tuple[int, float], ...]:
|
||
"""Two-burst variant to constrain cross-correlation more tightly."""
|
||
out: list[tuple[int, float]] = []
|
||
period_s = 1.0 / hz
|
||
t = start_s
|
||
while t < end_s:
|
||
if burst_a_at_s <= t < burst_a_at_s + burst_duration_s:
|
||
out.append((_ns(t), burst_amplitude))
|
||
elif burst_b_at_s <= t < burst_b_at_s + burst_duration_s:
|
||
out.append((_ns(t), burst_amplitude))
|
||
else:
|
||
out.append((_ns(t), baseline_amplitude))
|
||
t += period_s
|
||
return tuple(out)
|
||
|
||
|
||
def _build_multi_flight_stream(
|
||
*,
|
||
flights: tuple[tuple[float, float], ...],
|
||
end_s: float,
|
||
hz: float,
|
||
in_flight_amplitude: float = 0.3,
|
||
ground_amplitude: float = 0.02,
|
||
) -> tuple[tuple[int, float], ...]:
|
||
"""Build a multi-flight IMU energy stream.
|
||
|
||
``flights`` is a tuple of ``(start_s, end_s)`` per flight. Between
|
||
flights the energy is ``ground_amplitude``; inside each flight it
|
||
is ``in_flight_amplitude``. Used by the multi-flight segmentation
|
||
tests to mimic a real "3 takeoffs at the same field" tlog.
|
||
"""
|
||
out: list[tuple[int, float]] = []
|
||
period_s = 1.0 / hz
|
||
t = 0.0
|
||
while t < end_s:
|
||
in_flight = any(s <= t < e for s, e in flights)
|
||
out.append((_ns(t), in_flight_amplitude if in_flight else ground_amplitude))
|
||
t += period_s
|
||
return tuple(out)
|
||
|
||
|
||
# ---------------------------------------------------------------------
|
||
# AC-1: takeoff-aligned regression — find_aligned_window must produce
|
||
# the same offset (within ± 50 ms) as the AZ-405 compute_offset path
|
||
# when the video covers the take-off.
|
||
|
||
|
||
def test_ac1_takeoff_aligned_offset_matches_az405_within_50ms() -> None:
|
||
# Arrange: 30 s tlog with a take-off-shaped IMU energy burst at
|
||
# t = 2 s; 5 s video with the same-shaped optical-flow burst at
|
||
# video_t = 0.5 s (motion onset half a second into the clip).
|
||
# AZ-405 would resolve offset_ms = (tlog_takeoff_ns -
|
||
# video_motion_onset_ns) // 1e6 ≈ 1.5 s. The AZ-698 aligner
|
||
# must agree within 50 ms.
|
||
tlog_energy = _build_motion_burst_stream(
|
||
start_s=0.0,
|
||
end_s=30.0,
|
||
hz=10.0,
|
||
burst_at_s=2.0,
|
||
burst_amplitude=1.2,
|
||
burst_duration_s=1.5,
|
||
baseline_amplitude=0.0,
|
||
)
|
||
flow_samples = _build_motion_burst_stream(
|
||
start_s=0.0,
|
||
end_s=5.0,
|
||
hz=10.0,
|
||
burst_at_s=0.5,
|
||
burst_amplitude=2.0,
|
||
burst_duration_s=1.5,
|
||
baseline_amplitude=0.0,
|
||
)
|
||
config = AutoSyncConfig()
|
||
expected_offset_ms = _ns(2.0 - 0.5) // 1_000_000
|
||
|
||
# Act
|
||
window = _align_via_cross_correlation(
|
||
tlog_energy=tlog_energy,
|
||
flow_samples=flow_samples,
|
||
config=config,
|
||
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
|
||
tlog_path=Path("/nonexistent.tlog"),
|
||
tlog_source_factory=None,
|
||
)
|
||
|
||
# Assert
|
||
assert window.fallback_used is False, "expected primary cross-corr path, not fallback"
|
||
assert abs(window.offset_ms - expected_offset_ms) <= 50, (
|
||
f"AZ-698 offset {window.offset_ms} ms outside ±50 ms of AZ-405-equivalent "
|
||
f"{expected_offset_ms} ms"
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------
|
||
# AC-2: mid-flight alignment — tlog 0–30 s with motion burst at t=15 s,
|
||
# video 0–5 s with motion burst at video_t=1 s. Expected:
|
||
# tlog_start_ns ≈ (15 - 1) s = 14 s (where video t=0 lands)
|
||
# offset_ms ≈ 14 000
|
||
|
||
|
||
def test_ac2_mid_flight_alignment_locates_correct_window() -> None:
|
||
# Arrange: distinctive double-burst pattern in both streams so
|
||
# cross-correlation lock is unambiguous (single-burst patterns
|
||
# can lock on the wrong baseline at edge bins).
|
||
tlog_energy = _build_double_burst_stream(
|
||
start_s=0.0,
|
||
end_s=30.0,
|
||
hz=10.0,
|
||
burst_a_at_s=15.0,
|
||
burst_b_at_s=18.0,
|
||
burst_amplitude=1.5,
|
||
burst_duration_s=0.8,
|
||
baseline_amplitude=0.0,
|
||
)
|
||
flow_samples = _build_double_burst_stream(
|
||
start_s=0.0,
|
||
end_s=5.0,
|
||
hz=10.0,
|
||
burst_a_at_s=1.0,
|
||
burst_b_at_s=4.0,
|
||
burst_amplitude=2.5,
|
||
burst_duration_s=0.8,
|
||
baseline_amplitude=0.0,
|
||
)
|
||
config = AutoSyncConfig()
|
||
period_ns = _ns(1.0 / config.alignment_resample_hz)
|
||
|
||
# Act
|
||
window = _align_via_cross_correlation(
|
||
tlog_energy=tlog_energy,
|
||
flow_samples=flow_samples,
|
||
config=config,
|
||
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
|
||
tlog_path=Path("/nonexistent.tlog"),
|
||
tlog_source_factory=None,
|
||
)
|
||
|
||
# Assert
|
||
assert window.fallback_used is False
|
||
# video burst A at t=1.0s aligns with tlog burst A at t=15.0s
|
||
# → video t=0 aligns with tlog t=14.0s within ±1 resample period.
|
||
assert abs(window.tlog_start_ns - _ns(14.0)) <= period_ns, (
|
||
f"tlog_start_ns={window.tlog_start_ns} not within one resample period "
|
||
f"({period_ns} ns) of the expected 14 s"
|
||
)
|
||
assert abs(window.offset_ms - 14_000) <= 100
|
||
assert window.tlog_end_ns > window.tlog_start_ns
|
||
|
||
|
||
# ---------------------------------------------------------------------
|
||
# AC-3: TlogReplayFcAdapter seek — messages whose raw _timestamp is
|
||
# below tlog_start_ns must NOT reach subscribers.
|
||
|
||
|
||
def _make_fake_msg(*, type_name: str, raw_ts_s: float, **fields: Any) -> SimpleNamespace:
|
||
"""Build a pymavlink-shaped fake message for replay-adapter tests."""
|
||
msg = SimpleNamespace(_timestamp=raw_ts_s, **fields)
|
||
|
||
def _get_type() -> str:
|
||
return type_name
|
||
|
||
msg.get_type = _get_type # type: ignore[attr-defined]
|
||
return msg
|
||
|
||
|
||
def _build_replay_adapter_with_seek(
|
||
*,
|
||
tlog_start_ns: int | None,
|
||
tmp_path: Path,
|
||
monkeypatch: pytest.MonkeyPatch,
|
||
) -> tuple[TlogReplayFcAdapter, list[FcTelemetryFrame]]:
|
||
"""Construct a TlogReplayFcAdapter wired to deterministic fakes."""
|
||
monkeypatch.setenv("BUILD_TLOG_REPLAY_ADAPTER", "ON")
|
||
tlog_file = tmp_path / "fake.tlog"
|
||
tlog_file.write_bytes(b"\x00")
|
||
|
||
received: list[FcTelemetryFrame] = []
|
||
|
||
fake_clock = MagicMock(spec=Clock)
|
||
fake_clock.monotonic_ns.return_value = 0
|
||
fake_clock.sleep_until_ns.return_value = None
|
||
fake_wgs = MagicMock()
|
||
fake_fdr = MagicMock()
|
||
fake_fdr.enqueue.return_value = None
|
||
|
||
adapter = TlogReplayFcAdapter(
|
||
tlog_path=tlog_file,
|
||
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
|
||
clock=fake_clock,
|
||
wgs_converter=fake_wgs,
|
||
fdr_client=fake_fdr,
|
||
time_offset_ms=0,
|
||
tlog_start_ns=tlog_start_ns,
|
||
pace=ReplayPace.ASAP,
|
||
)
|
||
adapter.subscribe_telemetry(received.append)
|
||
return adapter, received
|
||
|
||
|
||
def test_ac3_adapter_seek_skips_pre_window_messages(
|
||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||
) -> None:
|
||
# Arrange: adapter opened with tlog_start_ns = 100 s; feed 5
|
||
# IMU messages, two below 100 s (must be skipped) and three at
|
||
# or above 100 s (must reach the subscriber).
|
||
adapter, received = _build_replay_adapter_with_seek(
|
||
tlog_start_ns=_ns(100.0),
|
||
tmp_path=tmp_path,
|
||
monkeypatch=monkeypatch,
|
||
)
|
||
pre_window = [
|
||
_make_fake_msg(
|
||
type_name="RAW_IMU",
|
||
raw_ts_s=t,
|
||
time_usec=int(t * 1e6),
|
||
xacc=0,
|
||
yacc=0,
|
||
zacc=1000,
|
||
xgyro=0,
|
||
ygyro=0,
|
||
zgyro=0,
|
||
)
|
||
for t in (50.0, 99.999)
|
||
]
|
||
in_window = [
|
||
_make_fake_msg(
|
||
type_name="RAW_IMU",
|
||
raw_ts_s=t,
|
||
time_usec=int(t * 1e6),
|
||
xacc=0,
|
||
yacc=0,
|
||
zacc=1000,
|
||
xgyro=0,
|
||
ygyro=0,
|
||
zgyro=0,
|
||
)
|
||
for t in (100.0, 101.5, 110.0)
|
||
]
|
||
|
||
# Act
|
||
for msg in pre_window + in_window:
|
||
adapter.feed_one_message(msg)
|
||
|
||
# Assert
|
||
assert len(received) == 3, "expected three in-window IMU frames"
|
||
assert all(
|
||
frame.kind == TelemetryKind.IMU_SAMPLE for frame in received
|
||
), "non-IMU frame leaked through"
|
||
# ``received_at`` is the raw _timestamp (no offset). Every
|
||
# delivered frame's raw timestamp must be ≥ 100 s.
|
||
for frame in received:
|
||
assert frame.received_at >= _ns(100.0), (
|
||
f"frame with received_at={frame.received_at} ns leaked below the seek bound"
|
||
)
|
||
|
||
|
||
def test_ac3_adapter_default_no_seek_passes_every_message(
|
||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||
) -> None:
|
||
# Arrange: tlog_start_ns=None (default) → no seek; every message reaches subscribers.
|
||
adapter, received = _build_replay_adapter_with_seek(
|
||
tlog_start_ns=None,
|
||
tmp_path=tmp_path,
|
||
monkeypatch=monkeypatch,
|
||
)
|
||
messages = [
|
||
_make_fake_msg(
|
||
type_name="RAW_IMU",
|
||
raw_ts_s=t,
|
||
time_usec=int(t * 1e6),
|
||
xacc=0,
|
||
yacc=0,
|
||
zacc=1000,
|
||
xgyro=0,
|
||
ygyro=0,
|
||
zgyro=0,
|
||
)
|
||
for t in (10.0, 50.0, 100.0)
|
||
]
|
||
|
||
# Act
|
||
for msg in messages:
|
||
adapter.feed_one_message(msg)
|
||
|
||
# Assert
|
||
assert len(received) == 3, "default (no seek) must pass every IMU message"
|
||
|
||
|
||
# ---------------------------------------------------------------------
|
||
# AC-4: AC-9 frame-window validator passes for both scenarios.
|
||
|
||
|
||
def test_ac4_validator_passes_for_takeoff_aligned_offset() -> None:
|
||
# Arrange: video frames at 30 fps for 5 s; tlog IMU at 100 Hz
|
||
# for 30 s covering both pre-take-off and post; offset places
|
||
# video t=0 at tlog t=2 s.
|
||
video_ts = [int(t * 1_000_000_000) for t in (i / 30.0 for i in range(150))]
|
||
tlog_ts = [int(t * 1_000_000_000) for t in (i / 100.0 for i in range(3000))]
|
||
offset_ms = 2_000
|
||
|
||
# Act
|
||
result = validate_offset_or_fail(
|
||
offset_ms,
|
||
tlog_imu_timestamps_ns=tlog_ts,
|
||
video_frame_timestamps_ns=video_ts,
|
||
threshold_pct=95.0,
|
||
window_ms=100,
|
||
)
|
||
|
||
# Assert
|
||
assert result == 0
|
||
|
||
|
||
def test_ac4_validator_passes_for_mid_flight_offset() -> None:
|
||
# Arrange: video covers 0–5 s; tlog covers 0–60 s; mid-flight
|
||
# offset places video t=0 at tlog t=30 s. Every video frame
|
||
# still has an IMU sample within ±100 ms of (vts + 30s) because
|
||
# the tlog covers that range densely.
|
||
video_ts = [int(t * 1_000_000_000) for t in (i / 30.0 for i in range(150))]
|
||
tlog_ts = [int(t * 1_000_000_000) for t in (i / 100.0 for i in range(6000))]
|
||
offset_ms = 30_000
|
||
|
||
# Act
|
||
result = validate_offset_or_fail(
|
||
offset_ms,
|
||
tlog_imu_timestamps_ns=tlog_ts,
|
||
video_frame_timestamps_ns=video_ts,
|
||
threshold_pct=95.0,
|
||
window_ms=100,
|
||
)
|
||
|
||
# Assert
|
||
assert result == 0
|
||
|
||
|
||
# ---------------------------------------------------------------------
|
||
# Resampler unit tests — pin the binning semantics so future
|
||
# regressions are caught explicitly.
|
||
|
||
|
||
def test_resample_uniform_averages_within_bin() -> None:
|
||
# Arrange: 3 samples in the first 100 ms bin (values 1, 2, 3 →
|
||
# mean 2.0), 1 sample in the second bin (value 4 → 4.0).
|
||
samples = (
|
||
(_ns(0.00), 1.0),
|
||
(_ns(0.03), 2.0),
|
||
(_ns(0.06), 3.0),
|
||
(_ns(0.15), 4.0),
|
||
)
|
||
period_ns = _ns(0.10)
|
||
|
||
# Act
|
||
resampled = _resample_uniform(samples, period_ns, origin_ns=0)
|
||
|
||
# Assert
|
||
assert math.isclose(resampled[0], 2.0)
|
||
assert math.isclose(resampled[1], 4.0)
|
||
|
||
|
||
def test_resample_uniform_drops_trailing_empty_bins() -> None:
|
||
# Arrange: one sample in bin 0, then a 1 s gap before the next sample.
|
||
# The samples between get carry-forward of the previous bin's value;
|
||
# trailing zeros only appear AFTER the last sample.
|
||
samples = (
|
||
(_ns(0.0), 5.0),
|
||
(_ns(1.05), 7.0),
|
||
)
|
||
period_ns = _ns(0.1)
|
||
|
||
# Act
|
||
resampled = _resample_uniform(samples, period_ns, origin_ns=0)
|
||
|
||
# Assert
|
||
# The first bin is 5.0, bins 1..9 carry-forward to 5.0 (the previous
|
||
# bin's value), and bin 10 captures the t=1.05 s sample as 7.0.
|
||
assert resampled[0] == 5.0
|
||
assert resampled[-1] == 7.0
|
||
# No trailing-zero tail.
|
||
assert all(v != 0.0 for v in resampled)
|
||
|
||
|
||
# ---------------------------------------------------------------------
|
||
# Fallback path — when cross-correlation confidence is below the
|
||
# threshold, find_aligned_window must fall back to the head-takeoff
|
||
# detector and set fallback_used=True.
|
||
|
||
|
||
def test_low_confidence_triggers_takeoff_fallback(
|
||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||
) -> None:
|
||
# Arrange: flat-line tlog (no motion) → cross-correlation has no
|
||
# meaningful peak. The fallback path opens the real tlog via
|
||
# detect_tlog_takeoff which needs a working tlog file. We bypass
|
||
# the actual fallback work by raising the threshold to 1.1 (no
|
||
# peak can clear it) and stubbing the takeoff detector.
|
||
monkeypatch.setattr(
|
||
"gps_denied_onboard.replay_input.auto_sync.detect_tlog_takeoff",
|
||
lambda path, dialect, config, *, source_factory=None: SimpleNamespace(
|
||
onset_ns=_ns(7.0), confidence=0.9
|
||
),
|
||
)
|
||
flat_tlog = tuple(
|
||
(_ns(t / 10.0), 0.0) for t in range(0, 100)
|
||
)
|
||
flat_flow = tuple(
|
||
(_ns(t / 10.0), 0.0) for t in range(0, 20)
|
||
)
|
||
config = AutoSyncConfig(alignment_low_confidence_threshold=0.5)
|
||
tlog_path = tmp_path / "fake.tlog"
|
||
tlog_path.write_bytes(b"\x00")
|
||
|
||
# Act
|
||
window = _align_via_cross_correlation(
|
||
tlog_energy=flat_tlog,
|
||
flow_samples=flat_flow,
|
||
config=config,
|
||
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
|
||
tlog_path=tlog_path,
|
||
tlog_source_factory=None,
|
||
)
|
||
|
||
# Assert
|
||
assert window.fallback_used is True
|
||
assert window.tlog_start_ns == _ns(7.0), "fallback did not pick up the stubbed takeoff onset"
|
||
|
||
|
||
# ---------------------------------------------------------------------
|
||
# Guard: video stream longer than tlog stream → reject (auto-trim
|
||
# requires the video to be a SLICE of a longer tlog).
|
||
|
||
|
||
def test_video_longer_than_tlog_raises() -> None:
|
||
# Arrange
|
||
tlog_energy = tuple((_ns(t / 10.0), 0.5) for t in range(10))
|
||
flow_samples = tuple((_ns(t / 10.0), 0.5) for t in range(50))
|
||
config = AutoSyncConfig()
|
||
|
||
# Act + Assert
|
||
with pytest.raises(ReplayInputAdapterError, match="video flow stream is longer"):
|
||
_align_via_cross_correlation(
|
||
tlog_energy=tlog_energy,
|
||
flow_samples=flow_samples,
|
||
config=config,
|
||
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
|
||
tlog_path=Path("/nonexistent.tlog"),
|
||
tlog_source_factory=None,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------
|
||
# AlignedWindow DTO is frozen + slotted.
|
||
|
||
|
||
def test_aligned_window_is_frozen() -> None:
|
||
# Arrange
|
||
w = AlignedWindow(
|
||
tlog_start_ns=1,
|
||
tlog_end_ns=2,
|
||
offset_ms=0,
|
||
confidence=0.9,
|
||
fallback_used=False,
|
||
)
|
||
|
||
# Act + Assert
|
||
with pytest.raises((AttributeError, TypeError)):
|
||
w.confidence = 0.5 # type: ignore[misc]
|
||
|
||
|
||
# ---------------------------------------------------------------------
|
||
# AC-5: end-to-end CLI smoke — skipped here because it requires
|
||
# ffmpeg-capable cv2 + the real ``derkachi.tlog``/``.mp4`` binaries.
|
||
# The actual CLI run is covered by ``tests/e2e/replay/`` when those
|
||
# prerequisites are available.
|
||
|
||
|
||
def _replay_inputs_present() -> bool:
|
||
fixtures = Path("_docs/00_problem/input_data/flight_derkachi")
|
||
return (fixtures / "derkachi.tlog").is_file() and (fixtures / "derkachi.mp4").is_file()
|
||
|
||
|
||
@pytest.mark.skipif(
|
||
not _replay_inputs_present(),
|
||
reason="AC-5 e2e smoke requires _docs/00_problem/input_data/flight_derkachi/derkachi.{tlog,mp4}",
|
||
)
|
||
def test_ac5_cli_auto_trim_smoke_uses_find_aligned_window(
|
||
monkeypatch: pytest.MonkeyPatch,
|
||
) -> None:
|
||
# Arrange: this test pins the wiring contract — the `--auto-trim`
|
||
# CLI flag must reach ReplayConfig.auto_trim. A full CLI run
|
||
# requires the runtime root which is exercised by the e2e suite.
|
||
from gps_denied_onboard.cli.replay import _build_replay_config
|
||
from gps_denied_onboard.config.schema import Config, ReplayConfig
|
||
|
||
args = SimpleNamespace(
|
||
video=Path("/tmp/v.mp4"),
|
||
tlog=Path("/tmp/t.tlog"),
|
||
output=Path("/tmp/o.jsonl"),
|
||
camera_calibration=Path("/tmp/c.json"),
|
||
config_path=Path("/tmp/c.yaml"),
|
||
mavlink_signing_key=Path("/tmp/k.bin"),
|
||
pace="asap",
|
||
time_offset_ms=None,
|
||
skip_auto_sync_validation=False,
|
||
auto_trim=True,
|
||
)
|
||
key_file = Path("/tmp/k.bin")
|
||
key_file.write_bytes(b"\x00" * 32)
|
||
base = Config()
|
||
base = type(base)(
|
||
mode=base.mode,
|
||
log=base.log,
|
||
fdr=base.fdr,
|
||
runtime=base.runtime,
|
||
fc=base.fc,
|
||
gcs=base.gcs,
|
||
replay=ReplayConfig(),
|
||
components=base.components,
|
||
)
|
||
|
||
# Act
|
||
new_config = _build_replay_config(args, base)
|
||
|
||
# Assert
|
||
assert new_config.replay.auto_trim is True
|
||
assert new_config.replay.time_offset_ms is None
|
||
|
||
|
||
# Cross-reference: the existing AZ-405 fixture still passes (no regression).
|
||
|
||
|
||
def test_autosync_decision_offset_is_within_ac9_window_for_baseline() -> None:
|
||
# Arrange: a takeoff-shaped tlog detector result + a video
|
||
# motion-onset detector result. compute_offset returns the
|
||
# AZ-405 offset_ms which is the AZ-698 baseline AC-1 references.
|
||
from gps_denied_onboard.replay_input.auto_sync import _DetectorResult
|
||
|
||
tlog_result = _DetectorResult(onset_ns=_ns(2.5), confidence=0.9)
|
||
video_result = _DetectorResult(onset_ns=_ns(0.5), confidence=0.85)
|
||
|
||
# Act
|
||
decision = compute_offset(tlog_result, video_result)
|
||
|
||
# Assert
|
||
assert decision.offset_ms == 2_000
|
||
assert decision.combined_confidence == pytest.approx(0.85, abs=1e-6)
|
||
|
||
|
||
# ---------------------------------------------------------------------
|
||
# Multi-flight tlog handling (user constraint: "if 1 flight take it, if
|
||
# multiple take the last"). The pre-NCC segmenter is the gatekeeper.
|
||
|
||
|
||
def test_segmenter_one_flight_returns_single_span() -> None:
|
||
# Arrange: 120 s tlog with a single flight from t=10..100 s.
|
||
samples = _build_multi_flight_stream(
|
||
flights=((10.0, 100.0),),
|
||
end_s=120.0,
|
||
hz=10.0,
|
||
)
|
||
|
||
# Act
|
||
segments = _segment_flights_from_imu_energy(
|
||
samples,
|
||
motion_threshold=0.1,
|
||
min_flight_duration_ns=_ns(30.0),
|
||
max_internal_gap_ns=_ns(5.0),
|
||
)
|
||
|
||
# Assert
|
||
assert len(segments) == 1
|
||
seg_start_ns, seg_end_ns = segments[0]
|
||
assert abs(seg_start_ns - _ns(10.0)) <= _ns(0.2)
|
||
assert abs(seg_end_ns - _ns(100.0)) <= _ns(0.2)
|
||
|
||
|
||
def test_segmenter_three_flights_returns_three_spans_in_order() -> None:
|
||
# Arrange: 360 s tlog with three takeoffs (60 s flights with 30 s
|
||
# ground gaps between them) — mimics the Derkachi scenario the
|
||
# user flagged: one tlog, three sorties, video covers only the
|
||
# last one.
|
||
flights_def = ((10.0, 70.0), (100.0, 160.0), (190.0, 250.0))
|
||
samples = _build_multi_flight_stream(
|
||
flights=flights_def,
|
||
end_s=300.0,
|
||
hz=10.0,
|
||
)
|
||
|
||
# Act
|
||
segments = _segment_flights_from_imu_energy(
|
||
samples,
|
||
motion_threshold=0.1,
|
||
min_flight_duration_ns=_ns(30.0),
|
||
max_internal_gap_ns=_ns(5.0),
|
||
)
|
||
|
||
# Assert
|
||
assert len(segments) == 3
|
||
for (actual_start, actual_end), (want_start, want_end) in zip(
|
||
segments, flights_def
|
||
):
|
||
assert abs(actual_start - _ns(want_start)) <= _ns(0.2)
|
||
assert abs(actual_end - _ns(want_end)) <= _ns(0.2)
|
||
|
||
|
||
def test_segmenter_drops_ground_blip_below_min_duration() -> None:
|
||
# Arrange: a 5 s ground manoeuvre (engine test) followed by a
|
||
# real 60 s flight. With min_flight_duration_ns=30 s the blip
|
||
# must be discarded, leaving only the real flight.
|
||
samples = _build_multi_flight_stream(
|
||
flights=((5.0, 10.0), (50.0, 110.0)),
|
||
end_s=120.0,
|
||
hz=10.0,
|
||
)
|
||
|
||
# Act
|
||
segments = _segment_flights_from_imu_energy(
|
||
samples,
|
||
motion_threshold=0.1,
|
||
min_flight_duration_ns=_ns(30.0),
|
||
max_internal_gap_ns=_ns(5.0),
|
||
)
|
||
|
||
# Assert
|
||
assert len(segments) == 1
|
||
seg_start_ns, _seg_end_ns = segments[0]
|
||
assert abs(seg_start_ns - _ns(50.0)) <= _ns(0.2)
|
||
|
||
|
||
def test_segmenter_keeps_brief_cruise_lull_inside_flight() -> None:
|
||
# Arrange: one flight with a 3 s cruise lull mid-way. The lull is
|
||
# below max_internal_gap_ns=5 s, so the segmenter must keep the
|
||
# whole flight as a single segment.
|
||
samples = _build_multi_flight_stream(
|
||
flights=((10.0, 45.0), (48.0, 100.0)),
|
||
end_s=120.0,
|
||
hz=10.0,
|
||
)
|
||
|
||
# Act
|
||
segments = _segment_flights_from_imu_energy(
|
||
samples,
|
||
motion_threshold=0.1,
|
||
min_flight_duration_ns=_ns(30.0),
|
||
max_internal_gap_ns=_ns(5.0),
|
||
)
|
||
|
||
# Assert
|
||
assert len(segments) == 1
|
||
seg_start_ns, seg_end_ns = segments[0]
|
||
assert abs(seg_start_ns - _ns(10.0)) <= _ns(0.2)
|
||
assert abs(seg_end_ns - _ns(100.0)) <= _ns(0.2)
|
||
|
||
|
||
def test_find_aligned_window_picks_last_flight_for_multi_flight_tlog(
|
||
tmp_path: Path,
|
||
) -> None:
|
||
# Arrange: a 300 s tlog with three sorties (10..70, 100..160,
|
||
# 190..250 s). The video covers only the LAST sortie — flow
|
||
# samples at video-clock 0..30 s with a motion burst at
|
||
# video t=5 s that, on the tlog timeline, corresponds to
|
||
# tlog t=200 s (5 s into flight 3 which starts at 190 s).
|
||
flights_def = ((10.0, 70.0), (100.0, 160.0), (190.0, 250.0))
|
||
tlog_energy = _build_multi_flight_stream(
|
||
flights=flights_def,
|
||
end_s=260.0,
|
||
hz=10.0,
|
||
)
|
||
flow_samples = _build_motion_burst_stream(
|
||
start_s=0.0,
|
||
end_s=30.0,
|
||
hz=10.0,
|
||
burst_at_s=5.0,
|
||
burst_amplitude=2.0,
|
||
burst_duration_s=3.0,
|
||
baseline_amplitude=0.05,
|
||
)
|
||
config = AutoSyncConfig(
|
||
alignment_segment_min_flight_duration_seconds=30.0,
|
||
alignment_segment_max_internal_gap_seconds=5.0,
|
||
)
|
||
|
||
# Inject the pre-loaded IMU energy by monkey-patching the loader
|
||
# used inside find_aligned_window; the function reads a tlog via
|
||
# pymavlink, but for the unit-level invariant we want to assert
|
||
# the segment selection — not the binary parser.
|
||
import gps_denied_onboard.replay_input.auto_sync as auto_sync_mod
|
||
|
||
fake_tlog = tmp_path / "multi_flight.tlog"
|
||
fake_tlog.write_bytes(b"\x00")
|
||
fake_video = tmp_path / "video.mp4"
|
||
fake_video.write_bytes(b"\x00")
|
||
|
||
def _fake_loader(
|
||
path: Path,
|
||
*,
|
||
max_messages: int,
|
||
source_factory: Any,
|
||
) -> tuple[tuple[int, float], ...]:
|
||
return tlog_energy
|
||
|
||
def _fake_frames(
|
||
path: Path, scan_seconds: float,
|
||
) -> "list[tuple[int, Any]]":
|
||
import numpy as np
|
||
|
||
rng = np.random.default_rng(42)
|
||
frames: list[tuple[int, Any]] = []
|
||
prev_offset = np.zeros((16, 16, 3), dtype=np.int16)
|
||
for ts_ns, mag in flow_samples:
|
||
# 3-channel BGR (cvtColor BGR→GRAY needs ≥ 3 channels).
|
||
# During a burst we shift pixels — that motion is what
|
||
# Farneback flow magnitudes pick up.
|
||
base = rng.integers(0, 30, size=(16, 16, 3), dtype=np.int16)
|
||
shift_px = int(mag * 4)
|
||
if shift_px > 0:
|
||
base = np.roll(base, shift=shift_px, axis=0)
|
||
frame = np.clip(base + prev_offset, 0, 255).astype(np.uint8)
|
||
frames.append((ts_ns, frame))
|
||
prev_offset = np.zeros_like(prev_offset)
|
||
return frames
|
||
|
||
monkeypatch = pytest.MonkeyPatch()
|
||
try:
|
||
monkeypatch.setattr(
|
||
auto_sync_mod, "_load_tlog_imu_energy_stream", _fake_loader
|
||
)
|
||
|
||
# Act
|
||
window = find_aligned_window(
|
||
fake_tlog,
|
||
fake_video,
|
||
config,
|
||
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
|
||
video_frames_factory=_fake_frames,
|
||
)
|
||
finally:
|
||
monkeypatch.undo()
|
||
|
||
# Assert: the aligner MUST select FLIGHT 3 (190..250 s), NOT
|
||
# flight 1 (10..70 s). Whether NCC locks on or the fallback
|
||
# path fires, the resulting window must lie inside flight 3 —
|
||
# that's the user-visible contract ("take the last flight").
|
||
flight3_start_ns, flight3_end_ns = (_ns(190.0), _ns(250.0))
|
||
assert window.flight_count_detected == 3
|
||
assert window.selected_flight_index == 2
|
||
assert flight3_start_ns <= window.tlog_start_ns <= flight3_end_ns
|
||
# Sanity: did NOT lock onto flight 1 or 2.
|
||
assert window.tlog_start_ns > _ns(160.0)
|
||
|
||
|
||
def test_align_via_cross_correlation_locks_onto_burst_inside_last_segment() -> None:
|
||
# Arrange: pre-segmented tlog energy restricted to flight 3
|
||
# (mimicking what find_aligned_window passes after segmentation),
|
||
# plus a flow stream whose burst pattern matches a specific
|
||
# offset inside that segment. This directly exercises the NCC
|
||
# path with the inputs the post-segmentation aligner sees.
|
||
last_segment_tlog = _build_motion_burst_stream(
|
||
start_s=190.0,
|
||
end_s=250.0,
|
||
hz=10.0,
|
||
burst_at_s=210.0,
|
||
burst_amplitude=1.5,
|
||
burst_duration_s=5.0,
|
||
baseline_amplitude=0.05,
|
||
)
|
||
flow_samples = _build_motion_burst_stream(
|
||
start_s=0.0,
|
||
end_s=30.0,
|
||
hz=10.0,
|
||
burst_at_s=10.0,
|
||
burst_amplitude=2.0,
|
||
burst_duration_s=5.0,
|
||
baseline_amplitude=0.05,
|
||
)
|
||
config = AutoSyncConfig()
|
||
|
||
# Act
|
||
window = _align_via_cross_correlation(
|
||
tlog_energy=last_segment_tlog,
|
||
flow_samples=flow_samples,
|
||
config=config,
|
||
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
|
||
tlog_path=Path("/nonexistent.tlog"),
|
||
tlog_source_factory=None,
|
||
flight_count_detected=3,
|
||
selected_flight_index=2,
|
||
)
|
||
|
||
# Assert: NCC must lock on (high confidence, no fallback). The
|
||
# tlog_start_ns must be the start of the matched 30 s window —
|
||
# video burst at video_t=10 s lines up with tlog_t=210 s ⇒
|
||
# tlog_start_ns ≈ 200 s (210 s − 10 s).
|
||
assert not window.fallback_used
|
||
assert window.confidence > 0.6
|
||
assert window.flight_count_detected == 3
|
||
assert window.selected_flight_index == 2
|
||
assert abs(window.tlog_start_ns - _ns(200.0)) <= _ns(0.2)
|
||
|
||
|
||
def test_find_aligned_window_uses_only_segment_for_segmented_tlog_fallback(
|
||
tmp_path: Path,
|
||
) -> None:
|
||
# Arrange: a 3-flight tlog where the video flow is flat (no
|
||
# structure for NCC to lock onto). NCC must produce confidence
|
||
# ~ 0; the fallback path must use the LAST segment start, NOT
|
||
# the head-takeoff detector (which would lock onto flight 1).
|
||
flights_def = ((10.0, 70.0), (100.0, 160.0), (190.0, 250.0))
|
||
tlog_energy = _build_multi_flight_stream(
|
||
flights=flights_def,
|
||
end_s=260.0,
|
||
hz=10.0,
|
||
)
|
||
flat_flow = tuple((_ns(t / 10.0), 0.5) for t in range(0, 50))
|
||
config = AutoSyncConfig()
|
||
|
||
# Act
|
||
window = _align_via_cross_correlation(
|
||
tlog_energy=tuple(
|
||
(ts, e) for ts, e in tlog_energy
|
||
if _ns(190.0) <= ts <= _ns(250.0)
|
||
),
|
||
flow_samples=flat_flow,
|
||
config=config,
|
||
target_fc_dialect=FcKind.ARDUPILOT_PLANE,
|
||
tlog_path=tmp_path / "x.tlog",
|
||
tlog_source_factory=None,
|
||
flight_count_detected=3,
|
||
selected_flight_index=2,
|
||
)
|
||
|
||
# Assert
|
||
assert window.fallback_used is True
|
||
assert window.flight_count_detected == 3
|
||
assert window.selected_flight_index == 2
|
||
# The fallback must use flight 3's start, not flight 1's takeoff.
|
||
assert window.tlog_start_ns >= _ns(190.0)
|
||
assert window.tlog_start_ns <= _ns(250.0)
|