diff --git a/_docs/02_tasks/done/AZ-698_tlog_trim_midflight_alignment.md b/_docs/02_tasks/done/AZ-698_tlog_trim_midflight_alignment.md index 2ca90df..b16ddc3 100644 --- a/_docs/02_tasks/done/AZ-698_tlog_trim_midflight_alignment.md +++ b/_docs/02_tasks/done/AZ-698_tlog_trim_midflight_alignment.md @@ -164,3 +164,49 @@ Tests: ### Algorithm note Implementation uses **normalised cross-correlation with per-window unit-norm** (each `len(flow_arr)`-sized slice of the tlog energy stream is zero-meaned + unit-normed before the dot product with the unit-normed flow stream). This makes the peak confidence scale-invariant — a 10 s motion burst inside a 300 s tlog produces a peak ≥ 0.95, where the original FFT-style correlation with full-length normalisation produced ≤ 0.3 and tripped the low-confidence fallback. Cost is O(N·M); with the 10 Hz subsample and a typical 300 s tlog × 10 s flow window, that's ~3 000 inner products — well below the NFR perf budget. + +### Follow-up: multi-flight tlog handling (post-batch-99 review) + +User reported that real `derkachi.tlog` contains **three takeoffs at the same field**, but the uploaded video covers only the **last** one. The original AZ-698 implementation was vulnerable in two places: + +1. NCC `argmax` returns the **first** index of the maximum — if all three flights produce comparable correlation peaks, the result would lock onto flight 1. +2. The low-confidence fallback called `detect_tlog_takeoff` on the whole tlog, which is the AZ-405 head-takeoff detector — also locks onto flight 1. + +Both contradicted the spec line 22: *"the last chunk in tlog is relevant"*. + +Resolution: added a pre-NCC flight segmenter and made the aligner explicitly select the last flight before running NCC. The fallback also now uses the last segment's start instead of head-takeoff detection. + +#### New module surface + +- `_segment_flights_from_imu_energy(samples, *, motion_threshold, min_flight_duration_ns, max_internal_gap_ns) -> list[(start_ns, end_ns)]` — partitions the IMU energy stream into distinct flights. A flight is a contiguous span where energy stayed above threshold, with sub-threshold runs (cruise lulls) shorter than `max_internal_gap_ns`. Reused logic note: `_find_sustained_event` (AZ-405) returns only the FIRST qualifying window by design; partitioning all flights needs the fresh one-pass walk. + +#### New config knobs (`AutoSyncConfig` + `ReplayAutoSyncConfig`) + +| Knob | Default | Meaning | +|------|---------|---------| +| `alignment_segment_motion_threshold_g` | `0.10` | Min IMU energy (`|a| − 1g`, g-units) for a sample to count as in-flight. 0.10 captures cruise oscillation while ignoring stationary sensor noise (~ 0.02). | +| `alignment_segment_min_flight_duration_seconds` | `30.0` | Discards short ground-startup blips. | +| `alignment_segment_max_internal_gap_seconds` | `30.0` | Sub-threshold gaps shorter than this stay inside a flight. | + +#### New observability fields (`AlignedWindow`) + +- `flight_count_detected: int` — how many distinct flights the segmenter found. `1` for a clean single-flight tlog. `> 1` means multi-flight; the aligner always selected the last one. +- `selected_flight_index: int` — zero-based; always `flight_count_detected - 1` when segmentation fired, else `-1` (segmenter found nothing — fall through to whole-tlog NCC, preserving pre-segmentation behaviour for degenerate inputs). + +Surfaced via the `replay.auto_trim.resolved` / `replay.auto_trim.fallback_to_takeoff` log records' `kv` dict so the operator can audit segment selection from the FDR. + +#### New tests + +| Test | Asserts | +|------|---------| +| `test_segmenter_one_flight_returns_single_span` | Single-flight tlog → 1 segment, correct bounds | +| `test_segmenter_three_flights_returns_three_spans_in_order` | 3-flight tlog → 3 segments in chronological order | +| `test_segmenter_drops_ground_blip_below_min_duration` | < 30 s motion is filtered out | +| `test_segmenter_keeps_brief_cruise_lull_inside_flight` | 3 s mid-flight lull does NOT split a flight | +| `test_find_aligned_window_picks_last_flight_for_multi_flight_tlog` | Full `find_aligned_window` pipeline on a 3-flight tlog → resulting window is inside flight 3, not flight 1 or 2 | +| `test_align_via_cross_correlation_locks_onto_burst_inside_last_segment` | NCC path locks correctly on a pre-restricted-to-last-flight energy stream | +| `test_find_aligned_window_uses_only_segment_for_segmented_tlog_fallback` | Low-confidence fallback uses segment start (last flight), NOT head-takeoff (flight 1) | + +All 19 AZ-698 tests pass, 1 expected skip (AC-5 real-video smoke). 113 tests pass in the broader regression slice — no regressions. + +Backward-compat verified: AC-1 / AC-2 / AC-3 / AC-4 tests exercise `_align_via_cross_correlation` directly and continue to pass; the segmentation gate only fires through `find_aligned_window`'s public entry point. diff --git a/_docs/03_implementation/batch_99_cycle2_report.md b/_docs/03_implementation/batch_99_cycle2_report.md index 0b73fcf..fd78159 100644 --- a/_docs/03_implementation/batch_99_cycle2_report.md +++ b/_docs/03_implementation/batch_99_cycle2_report.md @@ -98,6 +98,33 @@ opt-in via constructor kwarg so the `--skip-auto-sync` path is untouched. The normalised-cross-correlation switch is documented in the spec's "Implementation Notes" appendix as the algorithmic decision of record. +## Follow-up commit: multi-flight handling + +User-reported gap during the AZ-698 "In Testing" phase: real +`derkachi.tlog` contains **three takeoffs**; the video covers only +the last. The original AZ-698 happy path (`np.argmax`) and fallback +(`detect_tlog_takeoff` on head) were both biased toward flight 1. + +Patched in a follow-up commit on top of batch 99: + +- New `_segment_flights_from_imu_energy` helper partitions the IMU + energy stream into distinct flights using a motion-threshold + + gap-tolerance walk. +- `find_aligned_window` now restricts NCC search to the **last** + detected segment. +- Low-confidence fallback uses the last segment's start instead of + re-running head-takeoff detection on the whole tlog. +- `AlignedWindow` gains `flight_count_detected` + `selected_flight_index` + for observability; both are surfaced in the `replay.auto_trim.resolved` / + `…fallback_to_takeoff` log records. +- New unit tests: segmenter happy paths (1-flight, 3-flight), + ground-blip rejection, cruise-lull preservation; integration test + proving `find_aligned_window` on a 3-flight tlog picks flight 3. + +Test totals after follow-up: **113 passed, 2 skipped, 0 failed.** +Zero new mypy --strict errors (12 errors in scope, all pre-existing +and unchanged). + ## Next batch Batch 100 — **AZ-699** (real-flight validation runner). Depends on diff --git a/src/gps_denied_onboard/config/loader.py b/src/gps_denied_onboard/config/loader.py index ff41066..bf617ae 100644 --- a/src/gps_denied_onboard/config/loader.py +++ b/src/gps_denied_onboard/config/loader.py @@ -198,6 +198,9 @@ _REPLAY_AUTO_SYNC_TYPES: Final[dict[str, type]] = { "alignment_resample_hz": float, "alignment_video_scan_seconds": float, "alignment_low_confidence_threshold": float, + "alignment_segment_motion_threshold_g": float, + "alignment_segment_min_flight_duration_seconds": float, + "alignment_segment_max_internal_gap_seconds": float, } diff --git a/src/gps_denied_onboard/config/schema.py b/src/gps_denied_onboard/config/schema.py index 2269312..a6f827b 100644 --- a/src/gps_denied_onboard/config/schema.py +++ b/src/gps_denied_onboard/config/schema.py @@ -323,6 +323,9 @@ class ReplayAutoSyncConfig: alignment_resample_hz: float = 10.0 alignment_video_scan_seconds: float = 30.0 alignment_low_confidence_threshold: float = 0.60 + alignment_segment_motion_threshold_g: float = 0.10 + alignment_segment_min_flight_duration_seconds: float = 30.0 + alignment_segment_max_internal_gap_seconds: float = 30.0 @dataclass(frozen=True) diff --git a/src/gps_denied_onboard/replay_input/auto_sync.py b/src/gps_denied_onboard/replay_input/auto_sync.py index a52b222..05d2341 100644 --- a/src/gps_denied_onboard/replay_input/auto_sync.py +++ b/src/gps_denied_onboard/replay_input/auto_sync.py @@ -727,15 +727,53 @@ def find_aligned_window( if not video_path.is_file(): raise ReplayInputAdapterError(f"video file not found: {video_path}") - tlog_energy = _load_tlog_imu_energy_stream( + tlog_energy_full = _load_tlog_imu_energy_stream( tlog_path, max_messages=config.prescan_max_messages * 10, source_factory=tlog_source_factory, ) + if len(tlog_energy_full) < 2: + raise ReplayInputAdapterError( + f"tlog yielded {len(tlog_energy_full)} IMU sample(s); " + "need ≥ 2 for cross-correlation alignment" + ) + + # Multi-flight handling: a tlog may cover several takeoffs at the + # same field (engine starts between sorties); the uploaded video + # only covers ONE of them, conventionally the LAST. Segment the + # tlog first and restrict NCC to the last detected flight so the + # peak cannot lock onto an earlier sortie. + flight_segments = _segment_flights_from_imu_energy( + tlog_energy_full, + motion_threshold=config.alignment_segment_motion_threshold_g, + min_flight_duration_ns=int( + config.alignment_segment_min_flight_duration_seconds * 1_000_000_000 + ), + max_internal_gap_ns=int( + config.alignment_segment_max_internal_gap_seconds * 1_000_000_000 + ), + ) + if flight_segments: + seg_start_ns, seg_end_ns = flight_segments[-1] + tlog_energy = tuple( + (ts, e) for ts, e in tlog_energy_full + if seg_start_ns <= ts <= seg_end_ns + ) + flight_count_detected = len(flight_segments) + selected_flight_index = len(flight_segments) - 1 + else: + # No clear flight pattern detected (degenerate tlog: very + # short, all-quiet, or thresholds badly tuned). Fall through + # to whole-tlog NCC so we keep the AZ-405-equivalent + # behavior; surface this via flight_count_detected=0. + tlog_energy = tlog_energy_full + flight_count_detected = 0 + selected_flight_index = -1 + if len(tlog_energy) < 2: raise ReplayInputAdapterError( - f"tlog yielded {len(tlog_energy)} IMU sample(s); " - "need ≥ 2 for cross-correlation alignment" + f"selected flight segment yielded {len(tlog_energy)} IMU " + "sample(s); need ≥ 2 for cross-correlation alignment" ) if video_frames_factory is None: @@ -765,6 +803,8 @@ def find_aligned_window( target_fc_dialect=target_fc_dialect, tlog_path=tlog_path, tlog_source_factory=tlog_source_factory, + flight_count_detected=flight_count_detected, + selected_flight_index=selected_flight_index, ) @@ -776,6 +816,8 @@ def _align_via_cross_correlation( target_fc_dialect: FcKind, tlog_path: Path, tlog_source_factory: Callable[[str], Any] | None, + flight_count_detected: int = 0, + selected_flight_index: int = -1, ) -> AlignedWindow: """Pure compute kernel: turn pre-loaded streams into an :class:`AlignedWindow`. @@ -844,6 +886,8 @@ def _align_via_cross_correlation( video_origin_ns=video_origin_ns, video_flow_duration_ns=video_duration_ns, confidence=confidence, + flight_count_detected=flight_count_detected, + selected_flight_index=selected_flight_index, ) # Absolute tlog timeline value where video t=0 aligns. The @@ -862,6 +906,8 @@ def _align_via_cross_correlation( offset_ms=offset_ms, confidence=confidence, fallback_used=False, + flight_count_detected=flight_count_detected, + selected_flight_index=selected_flight_index, ) @@ -883,28 +929,49 @@ def _fallback_to_head_takeoff( video_origin_ns: int, video_flow_duration_ns: int, confidence: float, + flight_count_detected: int = 0, + selected_flight_index: int = -1, ) -> AlignedWindow: - """Low-confidence path: use AZ-405 head-takeoff detector. + """Low-confidence fallback path. - Returns an :class:`AlignedWindow` whose ``offset_ms`` and - ``tlog_start_ns`` come from the takeoff onset; ``fallback_used`` - is ``True`` so callers + FDR audit can record the divergence. - The reported ``confidence`` is the original (sub-threshold) - cross-correlation peak — it is informational only when the - fallback path is taken. + Two modes: + + * **Segmented tlog** (``flight_count_detected > 0``): the + pre-NCC segmenter already chose the LAST flight. We use + ``tlog_energy[0][0]`` — the start of that segment — as the + ``tlog_start_ns`` rather than re-running the AZ-405 + head-takeoff detector (which would lock onto FLIGHT 1's + takeoff on a multi-flight tlog and silently throw away the + segmenter's correct answer). This is the AZ-698-after-user- + feedback contract: "if 1 flight take it, if multiple take + the last" applies to the fallback path too. + + * **Un-segmented tlog** (``flight_count_detected == 0``): no + flight pattern fired in the segmenter (degenerate / very + short tlog). Fall back to the AZ-405 head-takeoff detector + as before — this preserves the single-flight behavior that + existed before AZ-698's segmentation stage. + + ``fallback_used`` is ``True`` in either case so callers + FDR + audit can record the divergence. The reported ``confidence`` is + the original (sub-threshold) cross-correlation peak — it is + informational only when the fallback path is taken. """ - takeoff = detect_tlog_takeoff( - tlog_path, - target_fc_dialect, - config, - source_factory=tlog_source_factory, - ) - if takeoff.confidence > 0.0: - tlog_start_ns = takeoff.onset_ns - elif tlog_energy: + if flight_count_detected > 0 and tlog_energy: tlog_start_ns = tlog_energy[0][0] else: - tlog_start_ns = 0 + takeoff = detect_tlog_takeoff( + tlog_path, + target_fc_dialect, + config, + source_factory=tlog_source_factory, + ) + if takeoff.confidence > 0.0: + tlog_start_ns = takeoff.onset_ns + elif tlog_energy: + tlog_start_ns = tlog_energy[0][0] + else: + tlog_start_ns = 0 tlog_end_ns = tlog_start_ns + video_flow_duration_ns offset_ms = (tlog_start_ns - video_origin_ns) // 1_000_000 return AlignedWindow( @@ -913,6 +980,8 @@ def _fallback_to_head_takeoff( offset_ms=offset_ms, confidence=confidence, fallback_used=True, + flight_count_detected=flight_count_detected, + selected_flight_index=selected_flight_index, ) @@ -969,6 +1038,60 @@ def _zero_mean_normalise( return result +def _segment_flights_from_imu_energy( + samples: tuple[tuple[int, float], ...], + *, + motion_threshold: float, + min_flight_duration_ns: int, + max_internal_gap_ns: int, +) -> list[tuple[int, int]]: + """Partition an IMU energy stream into distinct flight segments. + + A flight is a contiguous span where energy stayed ``>=`` the + threshold, with no sub-threshold run longer than + ``max_internal_gap_ns`` (cruise lulls don't split a flight). + Spans shorter than ``min_flight_duration_ns`` are discarded as + ground-startup noise. Returns ``(start_ns, end_ns)`` per flight, + in chronological order. + + AZ-698 / AZ-697 user constraint: a single tlog often spans + multiple takeoffs at the same field, but the uploaded video only + covers the **last** one. The aligner uses this segmenter to find + every flight, then restricts NCC search to the last segment so + the trim is unambiguous. ``_find_sustained_event`` (AZ-405) + returns only the FIRST qualifying window by design; partitioning + all flights needs this fresh one-pass walk. + """ + if not samples: + return [] + segments: list[tuple[int, int]] = [] + in_flight = False + flight_start_ns = 0 + last_above_ns = 0 + last_below_ns: int | None = None + for ts, energy in samples: + if energy >= motion_threshold: + if not in_flight: + in_flight = True + flight_start_ns = ts + last_above_ns = ts + last_below_ns = None + else: + if in_flight: + if last_below_ns is None: + last_below_ns = ts + if (ts - last_below_ns) >= max_internal_gap_ns: + if ( + last_above_ns - flight_start_ns + ) >= min_flight_duration_ns: + segments.append((flight_start_ns, last_above_ns)) + in_flight = False + last_below_ns = None + if in_flight and (last_above_ns - flight_start_ns) >= min_flight_duration_ns: + segments.append((flight_start_ns, last_above_ns)) + return segments + + def _load_tlog_imu_energy_stream( tlog_path: Path, *, diff --git a/src/gps_denied_onboard/replay_input/interface.py b/src/gps_denied_onboard/replay_input/interface.py index 79774f5..fcfd338 100644 --- a/src/gps_denied_onboard/replay_input/interface.py +++ b/src/gps_denied_onboard/replay_input/interface.py @@ -91,6 +91,21 @@ class AutoSyncConfig: confidence below which :func:`find_aligned_window` falls back to the head-takeoff detector (AZ-405 path). Default 0.60. + alignment_segment_motion_threshold_g: Minimum IMU energy + (``|a| - 1g`` in g-units) for a sample to count as + "in-flight" during segmentation. A 3-flight tlog has 3 + spans where this threshold is exceeded with gaps below + ``alignment_segment_max_internal_gap_seconds``. Default + 0.10 — captures cruise oscillation while ignoring + stationary sensor noise (~ 0.02 g). + alignment_segment_min_flight_duration_seconds: Minimum span + length (in seconds) for a candidate segment to be + classified as a flight. Discards short ground-startup + blips. Default 30. + alignment_segment_max_internal_gap_seconds: Sub-threshold + spans shorter than this stay inside a flight (cruise + lulls don't split a flight); spans equal-or-longer end + the current flight. Default 30. """ takeoff_accel_threshold_g: float = 0.5 @@ -105,6 +120,9 @@ class AutoSyncConfig: alignment_resample_hz: float = 10.0 alignment_video_scan_seconds: float = 30.0 alignment_low_confidence_threshold: float = 0.60 + alignment_segment_motion_threshold_g: float = 0.10 + alignment_segment_min_flight_duration_seconds: float = 30.0 + alignment_segment_max_internal_gap_seconds: float = 30.0 @dataclass(frozen=True, slots=True) @@ -163,6 +181,18 @@ class AlignedWindow: fallback_used: ``True`` when cross-correlation confidence dropped below the threshold and the result was built from the head-takeoff detector instead. + flight_count_detected: Number of distinct flight segments the + pre-NCC segmenter found in the tlog. ``1`` for a clean + single-flight tlog. ``> 1`` means the tlog covers + multiple flights — the aligner always selects the + **last** one (per AZ-698 spec line 23: "the last chunk + in tlog is relevant"). + selected_flight_index: Zero-based index of the segment the + aligner restricted NCC to. Always + ``flight_count_detected - 1`` when ``flight_count_detected + > 0``; ``-1`` when segmentation returned no candidate and + the aligner fell through to whole-tlog NCC (single-flight + edge case where the segmenter's thresholds didn't fire). """ tlog_start_ns: int @@ -170,6 +200,8 @@ class AlignedWindow: offset_ms: int confidence: float fallback_used: bool + flight_count_detected: int = 0 + selected_flight_index: int = -1 @dataclass(frozen=True, slots=True) diff --git a/src/gps_denied_onboard/replay_input/tlog_video_adapter.py b/src/gps_denied_onboard/replay_input/tlog_video_adapter.py index 339e2a4..f71c398 100644 --- a/src/gps_denied_onboard/replay_input/tlog_video_adapter.py +++ b/src/gps_denied_onboard/replay_input/tlog_video_adapter.py @@ -476,10 +476,14 @@ class ReplayInputAdapter: "offset_ms": window.offset_ms, "confidence": window.confidence, "fallback_used": window.fallback_used, + "flight_count_detected": window.flight_count_detected, + "selected_flight_index": window.selected_flight_index, } msg = ( f"{kind}: tlog_start_ns={window.tlog_start_ns} " - f"offset_ms={window.offset_ms} confidence={window.confidence:.3f}" + f"offset_ms={window.offset_ms} confidence={window.confidence:.3f} " + f"flights_detected={window.flight_count_detected} " + f"selected_flight={window.selected_flight_index}" ) if window.fallback_used: self._log.warning(msg, extra={"kind": kind, "kv": kv}) diff --git a/src/gps_denied_onboard/runtime_root/_replay_branch.py b/src/gps_denied_onboard/runtime_root/_replay_branch.py index 6052279..e85f9b5 100644 --- a/src/gps_denied_onboard/runtime_root/_replay_branch.py +++ b/src/gps_denied_onboard/runtime_root/_replay_branch.py @@ -271,6 +271,15 @@ def _build_auto_sync_config(config: Config) -> AutoSyncConfig: alignment_resample_hz=block.alignment_resample_hz, alignment_video_scan_seconds=block.alignment_video_scan_seconds, alignment_low_confidence_threshold=block.alignment_low_confidence_threshold, + alignment_segment_motion_threshold_g=( + block.alignment_segment_motion_threshold_g + ), + alignment_segment_min_flight_duration_seconds=( + block.alignment_segment_min_flight_duration_seconds + ), + alignment_segment_max_internal_gap_seconds=( + block.alignment_segment_max_internal_gap_seconds + ), ) diff --git a/tests/unit/replay_input/test_az698_window_alignment.py b/tests/unit/replay_input/test_az698_window_alignment.py index 94aa752..b939d70 100644 --- a/tests/unit/replay_input/test_az698_window_alignment.py +++ b/tests/unit/replay_input/test_az698_window_alignment.py @@ -35,8 +35,10 @@ from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import ( from gps_denied_onboard.replay_input.auto_sync import ( _align_via_cross_correlation, _resample_uniform, + _segment_flights_from_imu_energy, compute_offset, detect_video_motion_onset, + find_aligned_window, validate_offset_or_fail, ) from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError @@ -110,6 +112,31 @@ def _build_double_burst_stream( return tuple(out) +def _build_multi_flight_stream( + *, + flights: tuple[tuple[float, float], ...], + end_s: float, + hz: float, + in_flight_amplitude: float = 0.3, + ground_amplitude: float = 0.02, +) -> tuple[tuple[int, float], ...]: + """Build a multi-flight IMU energy stream. + + ``flights`` is a tuple of ``(start_s, end_s)`` per flight. Between + flights the energy is ``ground_amplitude``; inside each flight it + is ``in_flight_amplitude``. Used by the multi-flight segmentation + tests to mimic a real "3 takeoffs at the same field" tlog. + """ + out: list[tuple[int, float]] = [] + period_s = 1.0 / hz + t = 0.0 + while t < end_s: + in_flight = any(s <= t < e for s, e in flights) + out.append((_ns(t), in_flight_amplitude if in_flight else ground_amplitude)) + t += period_s + return tuple(out) + + # --------------------------------------------------------------------- # AC-1: takeoff-aligned regression — find_aligned_window must produce # the same offset (within ± 50 ms) as the AZ-405 compute_offset path @@ -614,3 +641,295 @@ def test_autosync_decision_offset_is_within_ac9_window_for_baseline() -> None: # Assert assert decision.offset_ms == 2_000 assert decision.combined_confidence == pytest.approx(0.85, abs=1e-6) + + +# --------------------------------------------------------------------- +# Multi-flight tlog handling (user constraint: "if 1 flight take it, if +# multiple take the last"). The pre-NCC segmenter is the gatekeeper. + + +def test_segmenter_one_flight_returns_single_span() -> None: + # Arrange: 120 s tlog with a single flight from t=10..100 s. + samples = _build_multi_flight_stream( + flights=((10.0, 100.0),), + end_s=120.0, + hz=10.0, + ) + + # Act + segments = _segment_flights_from_imu_energy( + samples, + motion_threshold=0.1, + min_flight_duration_ns=_ns(30.0), + max_internal_gap_ns=_ns(5.0), + ) + + # Assert + assert len(segments) == 1 + seg_start_ns, seg_end_ns = segments[0] + assert abs(seg_start_ns - _ns(10.0)) <= _ns(0.2) + assert abs(seg_end_ns - _ns(100.0)) <= _ns(0.2) + + +def test_segmenter_three_flights_returns_three_spans_in_order() -> None: + # Arrange: 360 s tlog with three takeoffs (60 s flights with 30 s + # ground gaps between them) — mimics the Derkachi scenario the + # user flagged: one tlog, three sorties, video covers only the + # last one. + flights_def = ((10.0, 70.0), (100.0, 160.0), (190.0, 250.0)) + samples = _build_multi_flight_stream( + flights=flights_def, + end_s=300.0, + hz=10.0, + ) + + # Act + segments = _segment_flights_from_imu_energy( + samples, + motion_threshold=0.1, + min_flight_duration_ns=_ns(30.0), + max_internal_gap_ns=_ns(5.0), + ) + + # Assert + assert len(segments) == 3 + for (actual_start, actual_end), (want_start, want_end) in zip( + segments, flights_def + ): + assert abs(actual_start - _ns(want_start)) <= _ns(0.2) + assert abs(actual_end - _ns(want_end)) <= _ns(0.2) + + +def test_segmenter_drops_ground_blip_below_min_duration() -> None: + # Arrange: a 5 s ground manoeuvre (engine test) followed by a + # real 60 s flight. With min_flight_duration_ns=30 s the blip + # must be discarded, leaving only the real flight. + samples = _build_multi_flight_stream( + flights=((5.0, 10.0), (50.0, 110.0)), + end_s=120.0, + hz=10.0, + ) + + # Act + segments = _segment_flights_from_imu_energy( + samples, + motion_threshold=0.1, + min_flight_duration_ns=_ns(30.0), + max_internal_gap_ns=_ns(5.0), + ) + + # Assert + assert len(segments) == 1 + seg_start_ns, _seg_end_ns = segments[0] + assert abs(seg_start_ns - _ns(50.0)) <= _ns(0.2) + + +def test_segmenter_keeps_brief_cruise_lull_inside_flight() -> None: + # Arrange: one flight with a 3 s cruise lull mid-way. The lull is + # below max_internal_gap_ns=5 s, so the segmenter must keep the + # whole flight as a single segment. + samples = _build_multi_flight_stream( + flights=((10.0, 45.0), (48.0, 100.0)), + end_s=120.0, + hz=10.0, + ) + + # Act + segments = _segment_flights_from_imu_energy( + samples, + motion_threshold=0.1, + min_flight_duration_ns=_ns(30.0), + max_internal_gap_ns=_ns(5.0), + ) + + # Assert + assert len(segments) == 1 + seg_start_ns, seg_end_ns = segments[0] + assert abs(seg_start_ns - _ns(10.0)) <= _ns(0.2) + assert abs(seg_end_ns - _ns(100.0)) <= _ns(0.2) + + +def test_find_aligned_window_picks_last_flight_for_multi_flight_tlog( + tmp_path: Path, +) -> None: + # Arrange: a 300 s tlog with three sorties (10..70, 100..160, + # 190..250 s). The video covers only the LAST sortie — flow + # samples at video-clock 0..30 s with a motion burst at + # video t=5 s that, on the tlog timeline, corresponds to + # tlog t=200 s (5 s into flight 3 which starts at 190 s). + flights_def = ((10.0, 70.0), (100.0, 160.0), (190.0, 250.0)) + tlog_energy = _build_multi_flight_stream( + flights=flights_def, + end_s=260.0, + hz=10.0, + ) + flow_samples = _build_motion_burst_stream( + start_s=0.0, + end_s=30.0, + hz=10.0, + burst_at_s=5.0, + burst_amplitude=2.0, + burst_duration_s=3.0, + baseline_amplitude=0.05, + ) + config = AutoSyncConfig( + alignment_segment_min_flight_duration_seconds=30.0, + alignment_segment_max_internal_gap_seconds=5.0, + ) + + # Inject the pre-loaded IMU energy by monkey-patching the loader + # used inside find_aligned_window; the function reads a tlog via + # pymavlink, but for the unit-level invariant we want to assert + # the segment selection — not the binary parser. + import gps_denied_onboard.replay_input.auto_sync as auto_sync_mod + + fake_tlog = tmp_path / "multi_flight.tlog" + fake_tlog.write_bytes(b"\x00") + fake_video = tmp_path / "video.mp4" + fake_video.write_bytes(b"\x00") + + def _fake_loader( + path: Path, + *, + max_messages: int, + source_factory: Any, + ) -> tuple[tuple[int, float], ...]: + return tlog_energy + + def _fake_frames( + path: Path, scan_seconds: float, + ) -> "list[tuple[int, Any]]": + import numpy as np + + rng = np.random.default_rng(42) + frames: list[tuple[int, Any]] = [] + prev_offset = np.zeros((16, 16, 3), dtype=np.int16) + for ts_ns, mag in flow_samples: + # 3-channel BGR (cvtColor BGR→GRAY needs ≥ 3 channels). + # During a burst we shift pixels — that motion is what + # Farneback flow magnitudes pick up. + base = rng.integers(0, 30, size=(16, 16, 3), dtype=np.int16) + shift_px = int(mag * 4) + if shift_px > 0: + base = np.roll(base, shift=shift_px, axis=0) + frame = np.clip(base + prev_offset, 0, 255).astype(np.uint8) + frames.append((ts_ns, frame)) + prev_offset = np.zeros_like(prev_offset) + return frames + + monkeypatch = pytest.MonkeyPatch() + try: + monkeypatch.setattr( + auto_sync_mod, "_load_tlog_imu_energy_stream", _fake_loader + ) + + # Act + window = find_aligned_window( + fake_tlog, + fake_video, + config, + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + video_frames_factory=_fake_frames, + ) + finally: + monkeypatch.undo() + + # Assert: the aligner MUST select FLIGHT 3 (190..250 s), NOT + # flight 1 (10..70 s). Whether NCC locks on or the fallback + # path fires, the resulting window must lie inside flight 3 — + # that's the user-visible contract ("take the last flight"). + flight3_start_ns, flight3_end_ns = (_ns(190.0), _ns(250.0)) + assert window.flight_count_detected == 3 + assert window.selected_flight_index == 2 + assert flight3_start_ns <= window.tlog_start_ns <= flight3_end_ns + # Sanity: did NOT lock onto flight 1 or 2. + assert window.tlog_start_ns > _ns(160.0) + + +def test_align_via_cross_correlation_locks_onto_burst_inside_last_segment() -> None: + # Arrange: pre-segmented tlog energy restricted to flight 3 + # (mimicking what find_aligned_window passes after segmentation), + # plus a flow stream whose burst pattern matches a specific + # offset inside that segment. This directly exercises the NCC + # path with the inputs the post-segmentation aligner sees. + last_segment_tlog = _build_motion_burst_stream( + start_s=190.0, + end_s=250.0, + hz=10.0, + burst_at_s=210.0, + burst_amplitude=1.5, + burst_duration_s=5.0, + baseline_amplitude=0.05, + ) + flow_samples = _build_motion_burst_stream( + start_s=0.0, + end_s=30.0, + hz=10.0, + burst_at_s=10.0, + burst_amplitude=2.0, + burst_duration_s=5.0, + baseline_amplitude=0.05, + ) + config = AutoSyncConfig() + + # Act + window = _align_via_cross_correlation( + tlog_energy=last_segment_tlog, + flow_samples=flow_samples, + config=config, + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + tlog_path=Path("/nonexistent.tlog"), + tlog_source_factory=None, + flight_count_detected=3, + selected_flight_index=2, + ) + + # Assert: NCC must lock on (high confidence, no fallback). The + # tlog_start_ns must be the start of the matched 30 s window — + # video burst at video_t=10 s lines up with tlog_t=210 s ⇒ + # tlog_start_ns ≈ 200 s (210 s − 10 s). + assert not window.fallback_used + assert window.confidence > 0.6 + assert window.flight_count_detected == 3 + assert window.selected_flight_index == 2 + assert abs(window.tlog_start_ns - _ns(200.0)) <= _ns(0.2) + + +def test_find_aligned_window_uses_only_segment_for_segmented_tlog_fallback( + tmp_path: Path, +) -> None: + # Arrange: a 3-flight tlog where the video flow is flat (no + # structure for NCC to lock onto). NCC must produce confidence + # ~ 0; the fallback path must use the LAST segment start, NOT + # the head-takeoff detector (which would lock onto flight 1). + flights_def = ((10.0, 70.0), (100.0, 160.0), (190.0, 250.0)) + tlog_energy = _build_multi_flight_stream( + flights=flights_def, + end_s=260.0, + hz=10.0, + ) + flat_flow = tuple((_ns(t / 10.0), 0.5) for t in range(0, 50)) + config = AutoSyncConfig() + + # Act + window = _align_via_cross_correlation( + tlog_energy=tuple( + (ts, e) for ts, e in tlog_energy + if _ns(190.0) <= ts <= _ns(250.0) + ), + flow_samples=flat_flow, + config=config, + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + tlog_path=tmp_path / "x.tlog", + tlog_source_factory=None, + flight_count_detected=3, + selected_flight_index=2, + ) + + # Assert + assert window.fallback_used is True + assert window.flight_count_detected == 3 + assert window.selected_flight_index == 2 + # The fallback must use flight 3's start, not flight 1's takeoff. + assert window.tlog_start_ns >= _ns(190.0) + assert window.tlog_start_ns <= _ns(250.0)