diff --git a/_docs/02_document/contracts/replay/replay_protocol.md b/_docs/02_document/contracts/replay/replay_protocol.md index 0c422be..0f9097f 100644 --- a/_docs/02_document/contracts/replay/replay_protocol.md +++ b/_docs/02_document/contracts/replay/replay_protocol.md @@ -139,6 +139,7 @@ class ReplayInputAdapter: camera_calibration: CameraCalibration, target_fc_dialect: FcKind, wgs_converter: WgsConverter, + fdr_client: FdrClient, # forwarded to TlogReplayFcAdapter + used for replay_input's own FDR records (auto-sync detected / low-confidence / AC-8 hard-fail) pace: ReplayPace, manual_time_offset_ms: int | None, # None → auto-sync runs (AZ-405) auto_sync_config: AutoSyncConfig, diff --git a/_docs/02_tasks/todo/AZ-405_replay_auto_sync.md b/_docs/02_tasks/done/AZ-405_replay_auto_sync.md similarity index 100% rename from _docs/02_tasks/todo/AZ-405_replay_auto_sync.md rename to _docs/02_tasks/done/AZ-405_replay_auto_sync.md diff --git a/_docs/03_implementation/batch_60_cycle1_report.md b/_docs/03_implementation/batch_60_cycle1_report.md new file mode 100644 index 0000000..7afc409 --- /dev/null +++ b/_docs/03_implementation/batch_60_cycle1_report.md @@ -0,0 +1,107 @@ +# Batch 60 — Cycle 1 Report + +**Date**: 2026-05-14 +**Tasks**: AZ-405 (`replay_input/` Layer-4 coordinator + auto-sync video↔tlog via IMU take-off detection) +**Verdict**: COMPLETE — PASS_WITH_WARNINGS + +## Summary + +Closed the AZ-405 gap in the replay subsystem by landing the `replay_input/` cross-cutting coordinator (Layer 4) and the auto-sync algorithm. After this batch, AZ-401 (composition root branch) has every strategy + every coordinator surface it needs to pivot `compose_root(config)` on `config.mode`. + +The new module follows ADR-011 ("replay is a configuration of the airborne binary"). `ReplayInputAdapter.open()` performs strict ordering so AC-13 holds: + +1. Tlog message-type pre-validation runs FIRST so a tlog missing `RAW_IMU` / `SCALED_IMU2` / `ATTITUDE` raises `ReplayInputAdapterError("tlog missing required message types: [...]")` before any video read. +2. If `manual_time_offset_ms is None`, the auto-sync detectors run; otherwise the manual offset is adopted directly (AC-8 — verified via call-count assertion that the detectors are NOT invoked). +3. The resolved offset is fed through the AC-9 frame-window match validator; a hard-fail raises `"auto-sync hard-fail: …"` so the shared main maps it to CLI exit code 2 (AC-7). +4. The single `Clock` instance is constructed: `TlogDerivedClock` for `pace=ASAP`, `WallClock` for `pace=REALTIME`. Invariant 2. +5. `VideoFileFrameSource` is built first; if construction fails the FC adapter is never opened. The FC adapter's own pre-scan runs as a defensive second sanity check during `open()`. +6. `ReplayInputBundle(frame_source, fc_adapter, clock, resolved_time_offset_ms, auto_sync_result)` is returned. + +`auto_sync.py` is split into pure compute kernels (`_compute_tlog_takeoff_from_samples`, `_compute_video_onset_from_samples`, `compute_offset`, `validate_offset_or_fail`) and disk-reading wrappers (`_load_tlog_samples`, `_read_video_frames`, `_compute_flow_magnitudes`). Tests target the kernels with synthetic fixtures; the wrappers are exercised end-to-end through the coordinator with `tlog_source_factory` / `video_frames_factory` / `video_timestamps_factory` injection points (mirrors the AZ-399 `source_factory` precedent). + +The take-off detector uses the body-frame proper-acceleration excess above the 1 g hover baseline (`abs(total_g - 1.0) > 0.5 g sustained ≥ 0.5 s`) plus a sustained attitude-rate magnitude (`> 1.0 rad/s sustained ≥ 0.5 s`). When both signals fire we take the earlier onset (thrust precedes the body-rate spike on a vertical climb) and `confidence = min(accel_ratio, attitude_ratio)`. When only one signal fires we discount confidence by 0.6 so `combined_confidence` reliably trips the WARN-and-proceed regime (AC-6). When neither fires we fall through to `confidence = 0.0` and let the AC-9 validator decide whether the run is salvageable. + +The video motion-onset detector uses `cv2.calcOpticalFlowFarneback` (dense flow, deterministic given identical input frames per AC-10) rather than pyramidal LK. Mean magnitude per pair is compared against `video_motion_threshold` (default 1.5 px) sustained for `sustained_seconds` (default 0.5 s). + +The contract `_docs/02_document/contracts/replay/replay_protocol.md` v2.0.0 was updated in-batch to add `fdr_client: FdrClient` to the `ReplayInputAdapter.__init__` signature — the v2.0.0 prose was missing it (the AZ-405 task spec had it correctly listed in the Constraints section, so no implementation drift). Captured as F1 Medium/Spec-Gap in the batch review and resolved by the contract update. + +## Files added / modified + +### Added (7) + +- `src/gps_denied_onboard/replay_input/__init__.py` — Public API re-exports (`ReplayInputAdapter`, `ReplayInputBundle`, `AutoSyncDecision`, `AutoSyncConfig`, `ReplayInputAdapterError`). +- `src/gps_denied_onboard/replay_input/errors.py` — `ReplayInputAdapterError(RuntimeError)` taxonomy. +- `src/gps_denied_onboard/replay_input/interface.py` — `AutoSyncConfig`, `AutoSyncDecision`, `ReplayInputBundle` (frozen + slots). +- `src/gps_denied_onboard/replay_input/auto_sync.py` — `detect_tlog_takeoff` + `detect_video_motion_onset` wrappers; `_compute_tlog_takeoff_from_samples` + `_compute_video_onset_from_samples` pure kernels; `compute_offset`; `validate_offset_or_fail` AC-9 validator; `TlogSamples` DTO; `_find_sustained_event` sliding-window helper; `_wrap_pi`; `_load_tlog_samples` + `_read_video_frames` + `_compute_flow_magnitudes` disk readers. +- `src/gps_denied_onboard/replay_input/tlog_video_adapter.py` — `ReplayInputAdapter` class (`open()` + idempotent `close()`); structured `replay.input.opened_manual_offset` / `replay.auto_sync.detected` / `replay.auto_sync.low_confidence` / `replay.auto_sync.ac8_validation_failed` log + FDR mirror. +- `tests/unit/replay_input/__init__.py` — empty marker. +- `tests/unit/replay_input/test_az405_auto_sync.py` — 14 tests covering AC-1..AC-10 (auto-sync kernels + offset compute + AC-9 validator + R-DEMO-3 kernel-side). +- `tests/unit/replay_input/test_az405_replay_input_adapter.py` — 11 tests covering AC-6..AC-13 (coordinator-side) + manual override bypass + clock-strategy-by-pace + idempotent close. + +### Modified (1) + +- `_docs/02_document/contracts/replay/replay_protocol.md` — added `fdr_client: FdrClient` to the `ReplayInputAdapter.__init__` signature with a one-line rationale comment (was missing in v2.0.0). + +## Task Results + +| Task | Status | Files Modified | Focused tests | AC Coverage | Issues | +|--------|--------|-------------------------------------------------------------|---------------|---------------|--------| +| AZ-405 | Done | 5 added under `src/`; 2 added under `tests/unit/replay_input/`; 1 contract clarification | 25/25 pass | 13/13 covered | None | + +## AC Test Coverage: 13/13 covered + +| AC | Test | Status | +|----|------|--------| +| AC-1 | `test_ac1_tlog_takeoff_detector_positive_within_50ms_and_high_confidence` | Covered | +| AC-2 | `test_ac2_tlog_takeoff_detector_low_amplitude_vibration_low_confidence` | Covered | +| AC-3 | `test_ac3_tlog_takeoff_detector_hand_launch_warn_regime` | Covered | +| AC-4 | `test_ac4_video_motion_onset_detected_within_one_frame` | Covered | +| AC-5 | `test_ac5_combined_offset_within_200ms_of_ground_truth` | Covered | +| AC-6 | `test_ac6_low_confidence_warn_and_proceed_does_not_raise` (+ `test_ac6_combined_confidence_takes_minimum_of_inputs`) | Covered | +| AC-7 | `test_ac7_validator_hard_fail_returns_2_for_offset_outside_window` (kernel) + `test_ac7_ac8_validator_hard_fail_raises_on_open` (coordinator) | Covered | +| AC-8 | `test_ac8_manual_override_bypasses_auto_detect` | Covered | +| AC-9 | `test_ac9_validator_passes_for_well_matched_offset` + `test_ac9_threshold_configurable` | Covered | +| AC-10 | `test_ac10_confidence_score_deterministic_across_two_runs` + `test_ac10_video_onset_deterministic_across_two_runs` | Covered | +| AC-11 | `test_ac11_open_returns_complete_bundle_with_correct_strategies` + `_pace_realtime_yields_wall_clock` + `_pace_asap_yields_tlog_derived_clock` + `_resolved_offset_matches_auto_sync_result` | Covered | +| AC-12 | `test_ac12_close_is_idempotent` + `test_close_without_open_does_not_raise` | Covered | +| AC-13 | `test_ac13_missing_imu_messages_fails_fast_before_video_read` + `_missing_attitude_messages_fails_fast` | Covered | + +## Code Review Verdict: PASS_WITH_WARNINGS + +See `_docs/03_implementation/reviews/batch_60_review.md`. Three findings — Medium ×1, Low ×2 — none blocking: + +1. **F1 Medium / Spec-Gap** — Replay protocol contract v2.0.0 prose was missing `fdr_client` from the `ReplayInputAdapter.__init__` signature. Resolved in-batch by updating the contract. +2. **F2 Low / Maintainability** — Confidence aggregator is a `min()` only (no agreement bonus). Acceptable today; AC-1 bar is "≥ 0.85" with both signals strong → `min()` returns 1.0. +3. **F3 Low / Maintainability** — Three test-only injection kwargs on the production constructor. Mirrors the AZ-399 `source_factory` precedent. + +No Critical / High / Architecture findings. Auto-fix not required. + +## Cumulative Code Review Verdict (batches 58-60): PASS_WITH_WARNINGS + +See `_docs/03_implementation/cumulative_review_batches_58-60_cycle1_report.md`. Five findings — Medium ×1 (resolved in-batch), Low ×4 (3 carry-forward from prior cumulative reviews + 1 new). No Architecture findings, no new cyclic dependencies, all cross-component imports respect Public API surfaces. + +## Auto-Fix Attempts: 0 + +## Stuck Agents: None + +## Tests Run + +- Focused suite (`tests/unit/replay_input/`): **25 passed**. +- Replay-adjacent regression (`tests/unit/c8_fc_adapter/`, `tests/unit/frame_source/`, sampled): no regressions. +- Full repo suite: deferred to Step 16 (Final Test Run) per the implement skill's "exactly once at end of implementation phase" cadence. + +## Next Batch + +The replay track is now nine-tenths wired: + +- ✅ `Clock` Protocol (AZ-398, batch 57) +- ✅ `FrameSource` + `VideoFileFrameSource` (AZ-398, batch 57) +- ✅ `TlogReplayFcAdapter` (AZ-399, batch 59) +- ✅ `ReplaySink` + `JsonlReplaySink` + `MavlinkTransport` cut-out (AZ-400, batch 59) +- ✅ `replay_input/` coordinator + auto-sync (AZ-405, this batch) +- ⏳ `compose_root(config)` mode-aware branch (AZ-401) +- ⏳ `gps-denied-replay` CLI (AZ-402) +- ⏳ E2E replay fixture (AZ-404) +- (cancelled) `gps-denied-replay-cli` Dockerfile + SBOM diff (AZ-403 — replaced by ADR-011 single-image design) + +Next eligible batch: AZ-401 alone (the only remaining task whose dependencies are now all satisfied; AZ-402 depends on AZ-401, AZ-404 depends on AZ-401+AZ-402). The C5 orthorectifier track (AZ-389) remains independently eligible and could be batched alongside if scope permits. diff --git a/_docs/03_implementation/cumulative_review_batches_58-60_cycle1_report.md b/_docs/03_implementation/cumulative_review_batches_58-60_cycle1_report.md new file mode 100644 index 0000000..b417fdf --- /dev/null +++ b/_docs/03_implementation/cumulative_review_batches_58-60_cycle1_report.md @@ -0,0 +1,133 @@ +# Cumulative Code Review — Batches 58-60 (Cycle 1) + +**Date**: 2026-05-14 +**Range**: batches 58 (AZ-358 + AZ-361 — C4 OpenCVGtsam pose estimator + Jacobian/thermal hybrid), 59 (AZ-399 + AZ-400 — TlogReplayFcAdapter + JsonlReplaySink/MavlinkTransport), 60 (AZ-405 — `replay_input/` coordinator + auto-sync) +**Compared against**: previous cumulative review batches 55-57 +**Verdict**: **PASS_WITH_WARNINGS** + +## Scope + +The 58-60 trio covers two distinct concerns: + +- **Batch 58** finished C4 pose estimation (Marginals + Jacobian-thermal hybrid). All 11 ACs across AZ-358 + AZ-361 are covered; no Architecture findings; one open follow-up (AZ-361 AC-11 informational latency comparison) carried forward. +- **Batches 59 + 60** brought the **replay subsystem** online end-to-end: AZ-399 added the tlog FC adapter, AZ-400 added the JSONL replay sink + the `MavlinkTransport` Protocol cut-out, and AZ-405 added the `replay_input/` coordinator + auto-sync detector. The composition root branch (AZ-401) is the next consumer in line. + +## Carry-over status from cumulative review 55-57 + +| Prior finding | Status | Notes | +|---------------|--------|-------| +| F1 (Low) — two parallel engine-output-probe helpers (C2 / C3) with FP32 vs FP16 probe dtype divergence | **OPEN — carry forward** | No code in batches 58-60 touched either helper. The TRT engine path that would surface this remains gated behind AZ-321 (lands in a later cycle). Sized at <1 point. | +| F2 (Low) — XFeat imports underscore-prefixed helpers from `_pipeline.py` | **OPEN — carry forward** | No code in batches 58-60 touched `c3_matcher/xfeat.py`. Convention-only; documented for the next refactor pass. | +| F3 (Low) — AZ-347 AC-special-2 latency benchmark not tested | **OPEN — carry forward** | Informational metric per the task spec; remains documented in the per-batch report for traceability. | +| (52-54) F2 (Low) — c1_vio test fakes not yet shared | **OPEN — carry forward** | No movement; remains a future hygiene pass. | + +## Findings (this window) + +| # | Severity | Category | File:Line | Title | +|---|----------|----------|-----------|-------| +| F1 | Medium | Spec-Gap | _docs/02_document/contracts/replay/replay_protocol.md:134-145 | Replay contract `ReplayInputAdapter.__init__` was missing `fdr_client` (resolved in batch 60) | +| F2 | Low | Maintainability | src/gps_denied_onboard/replay_input/auto_sync.py + src/gps_denied_onboard/components/c8_fc_adapter/tlog_replay_adapter.py | Tlog message-type pre-validation logic exists in two places (coordinator-side `_load_tlog_samples` + AZ-399's `_prescan_required_messages`) | +| F3 | Low | Maintainability | src/gps_denied_onboard/replay_input/tlog_video_adapter.py | Three test-only injection kwargs (`tlog_source_factory`, `video_frames_factory`, `video_timestamps_factory`) on the production constructor (batch 60 carry-forward) | +| F4 | Low | Performance | src/gps_denied_onboard/components/c4_pose/opencv_gtsam_estimator.py | Two `cv2.projectPoints` calls per Marginals frame (batch 58 carry-forward) | +| F5 | Low | Spec-Gap | tests/unit/c4_pose/test_az358_361_opencv_gtsam_estimator.py | AZ-361 AC-11 informational Jacobian-vs-Marginals RMSE comparison not asserted (batch 58 carry-forward) | + +### Finding Details + +#### F1: Replay contract `ReplayInputAdapter.__init__` was missing `fdr_client` (Medium / Spec-Gap) + +- **Location**: `_docs/02_document/contracts/replay/replay_protocol.md:134-145` +- **Description**: The replay protocol contract v2.0.0 specified the `ReplayInputAdapter.__init__` signature without an `fdr_client` parameter. The implementation needs `fdr_client` to (a) forward to `TlogReplayFcAdapter` (mandatory per AZ-399) and (b) emit the coordinator's own `replay.auto_sync.{detected,low_confidence,ac8_validation_failed}` FDR records. AZ-405's task spec already lists `fdr_client` in its allowed-imports list, so this was a contract-side gap, not an implementation drift. +- **Status**: resolved in batch 60 — contract updated to include `fdr_client: FdrClient` in the constructor signature. No Architecture finding because the dependency is at the documented Layer-1 boundary. +- **Why surfaced cumulatively**: the gap only became visible when AZ-405 wired the FC adapter into the coordinator; batches 58-59 do not consume the coordinator. + +#### F2: Two parallel tlog message-type pre-validators (Low / Maintainability) + +- **Locations**: + - `src/gps_denied_onboard/replay_input/auto_sync.py` (`_load_tlog_samples` + caller `_load_and_validate_tlog`) — checks `RAW_IMU` / `SCALED_IMU2` + `ATTITUDE` presence to satisfy AC-13. + - `src/gps_denied_onboard/components/c8_fc_adapter/tlog_replay_adapter.py:_prescan_required_messages` (AZ-399) — checks `RAW_IMU` / `SCALED_IMU2` + `ATTITUDE` + `GPS_RAW_INT` / `GPS2_RAW` + `HEARTBEAT`. +- **Description**: The two checks have **partially overlapping** required-message sets and **different error message shapes** (`"tlog missing required message types: [...]"` from the coordinator vs `"tlog missing required messages: [...]; consumed by: [...]"` from the FC adapter). Both fire today: the coordinator runs first to satisfy AC-13's "fail-fast BEFORE any video read", then the FC adapter's pre-scan re-runs as a defensive second sanity check during `open()`. +- **Why this is not a duplicate-symbol violation**: the two checks have **different jobs**. The coordinator-side check is the AC-13 surface — it raises with the coordinator's contract-mandated message shape so the CLI exit-code mapping works. The FC adapter check is the AZ-399 INV-3 (R-DEMO-3) surface — it lists the consumers of the missing groups so the operator knows which downstream component is starved. Merging them would either lose information or leak coordinator concepts into a Layer-4 component that should be coordinator-agnostic. +- **Suggestion**: keep both; revisit if a third caller (e.g., a future analytics tool that wants the same fail-fast behavior) appears. Document the relationship in a future hygiene task. +- **Why Low**: both surfaces are tested; the duplication is documented; no current fixture surfaces a divergent error shape. + +#### F3: Test-only injection kwargs on the production constructor (Low / Maintainability — carry-forward from batch 60) + +- **Location**: `src/gps_denied_onboard/replay_input/tlog_video_adapter.py:ReplayInputAdapter.__init__` +- **Description**: Three kwargs (`tlog_source_factory`, `video_frames_factory`, `video_timestamps_factory`) default to `None` and exist solely so the unit tests can swap in fakes without hitting pymavlink / OpenCV. Mirrors the AZ-399 `TlogReplayFcAdapter`'s `source_factory` precedent in the same epic. +- **Suggestion**: keep — established project pattern. Consider a shared `_TestInjections` Protocol if a third coordinator adopts the same shape. + +#### F4: Two `cv2.projectPoints` calls per Marginals frame (Low / Performance — carry-forward from batch 58) + +- **Location**: `src/gps_denied_onboard/components/c4_pose/opencv_gtsam_estimator.py:_compute_reprojection_residuals` + `_jacobian_covariance` +- **Status**: same as the per-batch report; no AC-blocking impact. Sized at 1-2 points for a future hygiene pass. + +#### F5: AZ-361 AC-11 informational RMSE comparison not asserted (Low / Spec-Gap — carry-forward from batch 58) + +- **Location**: `tests/unit/c4_pose/test_az358_361_opencv_gtsam_estimator.py` +- **Status**: per the task spec, AC-11 is informational and explicitly does not block. Documented for traceability. + +## Phase Summary + +### Phase 1 — Context Loading + +Read inputs: + +- `_docs/03_implementation/reviews/batch_58_review.md` +- `_docs/03_implementation/reviews/batch_59_review.md` +- `_docs/03_implementation/reviews/batch_60_review.md` +- `_docs/03_implementation/cumulative_review_batches_55-57_cycle1_report.md` +- `_docs/02_tasks/done/AZ-358_c4_opencv_gtsam_marginals.md` +- `_docs/02_tasks/done/AZ-361_c4_jacobian_thermal_hybrid.md` +- `_docs/02_tasks/done/AZ-399_replay_tlog_adapter.md` +- `_docs/02_tasks/done/AZ-400_replay_jsonl_sink.md` +- `_docs/02_tasks/todo/AZ-405_replay_auto_sync.md` +- `_docs/02_document/contracts/replay/replay_protocol.md` v2.0.0 +- `_docs/02_document/architecture.md` (ADR-011) +- `_docs/02_document/module-layout.md` + +### Phase 2 — Spec Compliance + +Per-batch reports already verified each AC; this cumulative pass spot-checked the following cross-cutting promises: + +- **Replay protocol Invariant 1** (no mode-aware branches outside the composition root): the `replay_input/` coordinator is the boundary; C1–C7 + C13 see only standard `FrameSource` / `FcAdapter` / `Clock`. AZ-401 will provide the AST-scan test that asserts no `if config.mode == "replay"` lines exist in component files. Not violated by batches 58-60. +- **Replay protocol Invariant 2** (single Clock instance): both batches 59 and 60 honour single-instance construction; the coordinator builds the Clock once and bundles it. +- **Replay protocol Invariant 5** (replay never writes to FC): AZ-399's `emit_external_position` / `emit_status_text` raise `FcEmitError`; AZ-405's coordinator never calls them. Verified by tests in batch 59. +- **Replay protocol Invariant 8** (`time_offset_ms` baked at construction, no live re-tuning): AZ-405's coordinator resolves the offset before constructing `TlogReplayFcAdapter`; the FC adapter receives the resolved value as a constructor argument. + +### Phase 3 — Code Quality + +No new findings beyond per-batch reports + F2 above. Tests across all three batches follow Arrange / Act / Assert with comment markers. + +### Phase 4 — Security + +No new findings. Replay file paths (video, tlog) are operator-supplied and validated for existence before any consumer call. No sensitive data in logs / FDR records. + +### Phase 5 — Performance + +No new findings beyond F4 (carry-forward). + +### Phase 6 — Cross-Task Consistency + +- AZ-405 cleanly consumes AZ-398 (frame_source + clock) + AZ-399 (TlogReplayFcAdapter) + AZ-400 (FdrClient via the existing AZ-273 surface) + AZ-279 (WgsConverter). All Public API surfaces match. +- The `ReplayInputBundle` shape is exactly what AZ-401 will need (the contract documents this). +- `BUILD_VIDEO_FILE_FRAME_SOURCE` and `BUILD_TLOG_REPLAY_ADAPTER` flags are checked at the right boundaries (component-internal in AZ-398/AZ-399; coordinator does NOT add a third flag, per ADR-011). + +### Phase 7 — Architecture Compliance + +- **Layer direction**: `replay_input/` is at Layer 4 per `module-layout.md`. It imports from Layer 1 (foundation) and from two specific Layer-4 strategies (`c8_fc_adapter.tlog_replay_adapter`, `frame_source.video_file`) — this cross-Layer-4 wiring is the documented coordinator pattern from ADR-011 (the coordinator IS the seam where Layer-4 strategies are instantiated). No Layer 3 imports. No back-channel. +- **Public API respect**: every cross-component import in batches 58-60 lives in the imported module's `__all__`. Verified by grepping `__all__` against the new files' import lists. +- **No new cyclic dependencies**: `replay_input/` is a leaf in the import graph until AZ-401 lands the composition-root consumer. +- **Duplicate symbols**: F2 above is the only candidate; classified as Low because the two checks have legitimately different responsibilities. +- **Cross-cutting concerns**: structured logging, FDR enqueue, ISO timestamps, WGS conversion all consumed from shared helpers — no local re-implementation. + +## Verdict Logic + +- 0 Critical, 0 High, 1 Medium (resolved in-batch by contract update), 4 Low (3 carry-forward + 1 new) → **PASS_WITH_WARNINGS**. + +## Outputs + +- `verdict`: PASS_WITH_WARNINGS +- `findings`: 5 (1 Medium + 4 Low) +- `critical_count`: 0 +- `high_count`: 0 +- `report_path`: `_docs/03_implementation/cumulative_review_batches_58-60_cycle1_report.md` diff --git a/_docs/03_implementation/reviews/batch_60_review.md b/_docs/03_implementation/reviews/batch_60_review.md new file mode 100644 index 0000000..76fcc91 --- /dev/null +++ b/_docs/03_implementation/reviews/batch_60_review.md @@ -0,0 +1,129 @@ +# Code Review Report + +**Batch**: 60 (AZ-405) +**Date**: 2026-05-14 +**Verdict**: PASS_WITH_WARNINGS + +## Findings + +| # | Severity | Category | File:Line | Title | +|---|----------|----------|-----------|-------| +| 1 | Medium | Spec-Gap | _docs/02_document/contracts/replay/replay_protocol.md:134-145 | Contract `ReplayInputAdapter.__init__` was missing `fdr_client` (now corrected) | +| 2 | Low | Maintainability | src/gps_denied_onboard/replay_input/auto_sync.py:300-340 | Confidence aggregator is a `min()` only — no agreement-bonus when accel + attitude align | +| 3 | Low | Maintainability | src/gps_denied_onboard/replay_input/tlog_video_adapter.py | Three test-only injection kwargs (`tlog_source_factory`, `video_frames_factory`, `video_timestamps_factory`) added to constructor | + +### Finding Details + +**F1: Contract `ReplayInputAdapter.__init__` did not list `fdr_client`** (Medium / Spec-Gap) + +- Location: `_docs/02_document/contracts/replay/replay_protocol.md:134-145` +- Description: The replay protocol contract v2.0.0 specified the `ReplayInputAdapter.__init__` signature without an `fdr_client` parameter, but the implementation requires one to (a) forward to `TlogReplayFcAdapter` (which is mandatory per AZ-399's contract) and (b) emit the coordinator's own FDR records on the `replay.auto_sync.detected` / `replay.auto_sync.low_confidence` / `replay.auto_sync.ac8_validation_failed` paths. Without `fdr_client` flowing through the coordinator, AZ-401 would have to bypass the coordinator and construct the FC adapter itself — which defeats the entire point of the seam. +- Suggestion: contract updated in this batch to add `fdr_client: FdrClient` to the constructor signature (one-line addition with rationale comment). The AZ-405 task spec's Constraints section already lists `fdr_client` in the Layer-1 imports the coordinator may consume, so the task spec and the implementation agree; only the prose contract was stale. +- Task: AZ-405 + +**F2: Confidence aggregator uses `min()` only** (Low / Maintainability) + +- Location: `src/gps_denied_onboard/replay_input/auto_sync.py:300-340` (`compute_offset` + `_compute_tlog_takeoff_from_samples`) +- Description: `compute_offset` aggregates the take-off and motion-onset confidences as `min(tlog_confidence, video_confidence)` — the weakest signal dominates. AC-3 explicitly tests the case where one signal is weak and we want the combined result to land in the WARN regime, so `min()` is correct for the AC. But with two strong signals, `min()` yields the same combined confidence as either side alone, throwing away the agreement-bonus that two corroborating detectors give. Today the AC bar is "≥ 0.85 confidence" so this is a non-issue. +- Suggestion: leave as-is; revisit if the AZ-404 e2e fixture surfaces fixtures where the WARN regime is hit on legitimate dual-strong-signal flights. +- Task: AZ-405 + +**F3: Test-only injection kwargs leak into the production constructor** (Low / Maintainability) + +- Location: `src/gps_denied_onboard/replay_input/tlog_video_adapter.py` — `__init__` accepts `tlog_source_factory`, `video_frames_factory`, `video_timestamps_factory` +- Description: Three kwargs default to `None` and exist only so unit tests can swap in fakes without hitting pymavlink / OpenCV. Mirrors the AZ-399 `TlogReplayFcAdapter`'s `source_factory` pattern (precedent in the same epic). Production callers pass none of them; the AZ-401 composition-root branch will not reference these names. +- Suggestion: keep — the AZ-399 precedent makes this the established project pattern. Consider migrating both to a shared `_FakeFactories` Protocol if a third coordinator adopts the same injection shape. +- Task: AZ-405 + +## Phase Summary + +### Phase 1 — Context Loading + +Read inputs: + +- `_docs/02_tasks/todo/AZ-405_replay_auto_sync.md` +- `_docs/02_document/contracts/replay/replay_protocol.md` (v2.0.0) +- `_docs/02_document/architecture.md` (ADR-011) +- `_docs/02_document/module-layout.md` (Layer 4, `shared/replay_input` entry) +- `_docs/02_document/epics.md` (E-DEMO-REPLAY ACs 7 / 8 / 9 / 10) + +### Phase 2 — Spec Compliance + +All 13 acceptance criteria are covered by tests in `tests/unit/replay_input/`: + +| AC | Test | Status | +|----|------|--------| +| AC-1 | `test_ac1_tlog_takeoff_detector_positive_within_50ms_and_high_confidence` | Covered | +| AC-2 | `test_ac2_tlog_takeoff_detector_low_amplitude_vibration_low_confidence` | Covered | +| AC-3 | `test_ac3_tlog_takeoff_detector_hand_launch_warn_regime` | Covered | +| AC-4 | `test_ac4_video_motion_onset_detected_within_one_frame` | Covered | +| AC-5 | `test_ac5_combined_offset_within_200ms_of_ground_truth` | Covered | +| AC-6 | `test_ac6_low_confidence_warn_and_proceed_does_not_raise` (+ `test_ac6_combined_confidence_takes_minimum_of_inputs`) | Covered | +| AC-7 | `test_ac7_validator_hard_fail_returns_2_for_offset_outside_window` (kernel) + `test_ac7_ac8_validator_hard_fail_raises_on_open` (coordinator) | Covered | +| AC-8 | `test_ac8_manual_override_bypasses_auto_detect` | Covered | +| AC-9 | `test_ac9_validator_passes_for_well_matched_offset` + `test_ac9_threshold_configurable` | Covered | +| AC-10 | `test_ac10_confidence_score_deterministic_across_two_runs` + `test_ac10_video_onset_deterministic_across_two_runs` | Covered | +| AC-11 | `test_ac11_open_returns_complete_bundle_with_correct_strategies` + `_pace_realtime_yields_wall_clock` + `_pace_asap_yields_tlog_derived_clock` + `_resolved_offset_matches_auto_sync_result` | Covered | +| AC-12 | `test_ac12_close_is_idempotent` + `test_close_without_open_does_not_raise` | Covered | +| AC-13 | `test_ac13_missing_imu_messages_fails_fast_before_video_read` + `_missing_attitude_messages_fails_fast` | Covered | + +Contract compliance — `ReplayInputAdapter.open()` raises with the contract-mandated messages: + +- `"tlog missing required message types: ..."` — verified by AC-13 tests +- `"auto-sync hard-fail: ..."` — verified by `test_ac7_ac8_validator_hard_fail_raises_on_open` +- `"video file unreadable / unsupported codec / ..."` — surfaced from `FrameSourceConfigError` re-raise; not unit-tested directly because the AC list does not require it (AC-13 only covers tlog fail-fast). Functional path is verified by integration with `VideoFileFrameSource` (which has its own AC for the message shape). + +`ReplayInputBundle` shape matches the contract: `frame_source`, `fc_adapter`, `clock`, `resolved_time_offset_ms`, `auto_sync_result`. Frozen + slotted dataclass per ADR-002. + +### Phase 3 — Code Quality + +- SOLID: `auto_sync.py` cleanly splits into pure compute kernels (`_compute_tlog_takeoff_from_samples`, `_compute_video_onset_from_samples`, `compute_offset`, `validate_offset_or_fail`) and disk-reading wrappers (`_load_tlog_samples`, `_read_video_frames`, `_compute_flow_magnitudes`). Tests target the kernels — disk IO is exercised only via the wrappers. +- Error handling: every coordinator-scope failure surfaces as `ReplayInputAdapterError` (subclass of `RuntimeError`). FC-side and frame-source-side errors are caught at the boundary and re-raised in coordinator shape with `__cause__` chaining. +- Naming: clear (`detect_tlog_takeoff`, `detect_video_motion_onset`, `compute_offset`, `validate_offset_or_fail`); thresholds named explicitly (`takeoff_accel_threshold_g`, `match_threshold_pct`). +- Complexity: longest method ≈ 60 lines (`open()`); split with explicit numbered phases in the docstring + helper methods (`_load_and_validate_tlog`, `_run_auto_sync`, `_load_video_timestamps`, `_build_clock`). +- Tests: every test follows Arrange / Act / Assert with `# Arrange|Act|Assert` markers (per `coderule.mdc`). +- Dead code: none introduced. `auto_sync.py` `_build_flag_on` helper is unused — it was added for symmetry with other replay modules but has no consumer in this batch. Acceptable as documented "for symmetry" in its docstring; will be removed if it remains unused after AZ-401 lands. + +### Phase 4 — Security + +- No SQL / command injection vectors. +- No hardcoded secrets. +- Tlog and video file paths are operator-supplied. Both are normalised to `pathlib.Path`; existence checks happen before any file is opened. +- Optional `tlog_source_factory` / `video_frames_factory` / `video_timestamps_factory` injection points are kwargs with `None` defaults; production composition does not supply them. There is no path where untrusted input could supply a malicious factory at runtime. +- The OpenCV dense-flow pass (`cv2.calcOpticalFlowFarneback`) does not deserialise — it consumes already-decoded BGR ndarrays. No unsafe deserialisation surface. + +### Phase 5 — Performance + +- Tlog scan is bounded by `prescan_max_messages` (default 6000 — ~30 s @ 200 Hz) and runs exactly once per `open()` (the result is reused for both the AC-13 missing-messages check AND the auto-sync take-off detector). The FC adapter's own pre-scan opens a fresh handle so the coordinator does not waste tlog parses. +- Video motion-onset scan reads only the leading `video_motion_scan_seconds` (default 10 s). Farneback is dense flow, but bounded by the scan window; AC-4 requires onset within the first ~10 frames so the truncation is intentional. +- AC-9 validator uses `bisect.bisect_left` over a pre-sorted IMU timestamp array → O(F log I) where F = video frames in scan window, I = IMU samples. Linear in the worst case. +- No N+1 query patterns; no blocking I/O in async context (codebase is sync-only). + +### Phase 6 — Cross-Task Consistency + +- AZ-405 consumes `TlogReplayFcAdapter` (AZ-399) + `VideoFileFrameSource` + `WallClock` + `TlogDerivedClock` (AZ-398) + `FdrClient` (AZ-273) + `WgsConverter` (AZ-279) + `iso_ts_now` (AZ-264). All consumed from their documented Public APIs. +- The `BUILD_VIDEO_FILE_FRAME_SOURCE` and `BUILD_TLOG_REPLAY_ADAPTER` flags must both be ON for the coordinator to construct the strategies. The coordinator does NOT add a new build flag of its own — replay-mode gating is the union of the two existing flags + AZ-401's `config.mode == "replay"` check (per spec). +- `AutoSyncConfig` defaults match the `replay_protocol.md` v2.0.0 contract and the AZ-405 spec's "0.5 g, 1 rad/s, 0.5 s sustained" thresholds. AZ-401 will map `config.replay.auto_sync.*` into an `AutoSyncConfig(...)` instance. + +### Phase 7 — Architecture Compliance + +- **Layer direction**: `replay_input` is at Layer 4 per `module-layout.md`. Imports are: + - Layer 1: `_types/{calibration, fc, geo}`, `clock/{tlog_derived, wall_clock}`, `fdr_client/{client, records}`, `frame_source/{errors, video_file}`, `helpers/iso_timestamps`, `helpers/wgs_converter` (TYPE_CHECKING-only). + - Layer 4 (cross-Layer-4 wiring within the same coordinator concern): `c8_fc_adapter/{errors, tlog_replay_adapter}`, `frame_source/video_file`. These are documented in `module-layout.md` as the strategies the coordinator instantiates — this is the intended contract per ADR-011 (the coordinator IS the architectural seam where Layer-4 strategies are instantiated). + - No imports from Layer 3 (no component dependencies). Verified by grep over the new files. +- **Public API respect**: every cross-component import lives in the imported component's documented Public API surface. (`tlog_replay_adapter.TlogReplayFcAdapter`, `tlog_replay_adapter.ReplayPace` — both exported in the AZ-399 module's `__all__`.) +- **No new cyclic dependencies**: `replay_input/` is a leaf in the import graph (no other module imports back into it; AZ-401's `compose_root` will be the first consumer once it lands). +- **Duplicate symbols**: none — `_DetectorResult`, `TlogSamples`, `_load_tlog_samples` are local to `replay_input/auto_sync.py`. The pymavlink message-type constants are local; the AZ-399 adapter has its own equivalent (`_REQUIRED_MESSAGE_GROUPS`) that serves a different purpose (group-OR matching for fail-fast). No overlap warrants extraction. +- **Cross-cutting concerns not locally re-implemented**: structured logging via `logging.getLogger`; FDR enqueue via `FdrClient.enqueue`; ISO timestamps via `iso_ts_now`. All consumed from shared helpers. + +## Verdict Logic + +- 0 Critical, 0 High, 1 Medium (Spec-Gap that was resolved in this batch by updating the contract), 2 Low → **PASS_WITH_WARNINGS**. + +## Outputs + +- `verdict`: PASS_WITH_WARNINGS +- `findings`: 3 (1 Medium + 2 Low) +- `critical_count`: 0 +- `high_count`: 0 +- `report_path`: `_docs/03_implementation/reviews/batch_60_review.md` diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 0036754..3265278 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -12,7 +12,7 @@ sub_step: retry_count: 0 cycle: 1 tracker: jira -last_completed_batch: 59 -last_cumulative_review: batches_55-57 -current_batch: 60 -current_batch_tasks: "AZ-401, AZ-389" +last_completed_batch: 60 +last_cumulative_review: batches_58-60 +current_batch: 61 +current_batch_tasks: "" diff --git a/src/gps_denied_onboard/replay_input/__init__.py b/src/gps_denied_onboard/replay_input/__init__.py new file mode 100644 index 0000000..e7ec188 --- /dev/null +++ b/src/gps_denied_onboard/replay_input/__init__.py @@ -0,0 +1,36 @@ +"""``replay_input/`` cross-cutting coordinator (AZ-405 / E-DEMO-REPLAY). + +Layer-4 module per ``_docs/02_document/module-layout.md``. Converges +``(video, tlog)`` inputs into the standard :class:`FrameSource`, +:class:`FcAdapter`, and :class:`Clock` surfaces consumed by the +airborne composition root. Owns the time-alignment concern between +video frames and tlog IMU/attitude ticks (manual via +``--time-offset-ms`` or automatic via the AZ-405 IMU-take-off +detector). + +New under ADR-011 (replay-as-configuration) — replaces the v1.0.0 +design where replay had its own composition root. + +Public surface re-exports the coordinator class, the bundle DTO, the +auto-sync decision DTO, the auto-sync config DTO, and the coordinator +error class. The detector functions in :mod:`auto_sync` are NOT +re-exported here so the public API stays focused on the composition +root's wiring needs; tests import the detectors via their full module +path. +""" + +from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError +from gps_denied_onboard.replay_input.interface import ( + AutoSyncConfig, + AutoSyncDecision, + ReplayInputBundle, +) +from gps_denied_onboard.replay_input.tlog_video_adapter import ReplayInputAdapter + +__all__ = [ + "AutoSyncConfig", + "AutoSyncDecision", + "ReplayInputAdapter", + "ReplayInputAdapterError", + "ReplayInputBundle", +] diff --git a/src/gps_denied_onboard/replay_input/auto_sync.py b/src/gps_denied_onboard/replay_input/auto_sync.py new file mode 100644 index 0000000..1698fa1 --- /dev/null +++ b/src/gps_denied_onboard/replay_input/auto_sync.py @@ -0,0 +1,646 @@ +"""Auto-sync detectors + offset compute + AC-9 validator (AZ-405). + +Three concerns: + +1. **Tlog take-off detector** — walks the head of the tlog, looks for + a sustained vertical-acceleration excess + sustained attitude-rate + excess, returns ``(takeoff_ns, confidence)``. +2. **Video motion-onset detector** — runs OpenCV pyramidal optical + flow over the leading seconds of the video, returns + ``(motion_onset_ns, confidence)``. +3. **AC-9 frame-window match validator** — given a candidate offset + and the tlog/video timestamp series, returns 0 if ≥ 95 % of + video frames have an IMU sample within ± 100 ms after the offset + is applied; 2 otherwise. + +The detector functions are split into a thin path-reading wrapper +(``detect_tlog_takeoff`` / ``detect_video_motion_onset``) and a pure +sample-driven core (``_compute_tlog_takeoff_from_samples`` / +``_compute_video_onset_from_samples``). Tests exercise the pure cores +directly with synthetic fixtures; production calls the wrappers, +which read the tlog via ``pymavlink`` and the video via ``cv2``. + +Both wrappers accept an optional ``source_factory`` (tlog) / +``frames_factory`` (video) injection point so unit tests can swap in +fakes without touching the filesystem (mirrors AZ-399's pattern). +""" + +from __future__ import annotations + +import bisect +import math +import os +from collections.abc import Callable, Iterable +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from gps_denied_onboard._types.fc import FcKind +from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError +from gps_denied_onboard.replay_input.interface import AutoSyncConfig, AutoSyncDecision + +if TYPE_CHECKING: + import numpy as np + +__all__ = [ + "TlogSamples", + "compute_offset", + "detect_tlog_takeoff", + "detect_video_motion_onset", + "validate_offset_or_fail", +] + + +# Conversion: MAVLink RAW_IMU / SCALED_IMU2 publish accelerometer +# components in mG (milli-G); 1 g ≡ 9.80665 m/s² by ISO 80000-3. +_MG_PER_G: float = 1000.0 +# Per the AZ-405 spec, the vertical-accel signal of interest is the +# magnitude excess above gravity (i.e., body acceleration regardless +# of frame orientation). At rest |a| ≈ 1 g; during upward thrust |a| +# > 1 g; during free-fall |a| ≈ 0 g. The take-off pattern is a +# sustained excess with positive sign (upward thrust), so we use +# ``|total_g - 1.0|`` as the criterion. +_REST_TOTAL_G: float = 1.0 + + +# --------------------------------------------------------------------- +# DTOs (internal — public API surfaces results via AutoSyncDecision) + + +@dataclass(frozen=True, slots=True) +class _DetectorResult: + """Outcome of a single detector pass. + + ``onset_ns`` is the best-guess event start (ns); ``confidence`` + is in [0, 1] and reflects how sustained the signal was relative + to the configured threshold + sustained-time requirement. + """ + + onset_ns: int + confidence: float + + +@dataclass(frozen=True, slots=True) +class TlogSamples: + """Pre-loaded tlog samples extracted by the take-off detector. + + Used as the input shape for :func:`_compute_tlog_takeoff_from_samples` + so unit tests can build a deterministic fixture without parsing a + real ``.tlog`` file. + + Attributes: + accel: Sequence of ``(ts_ns, total_accel_g)`` pairs sourced + from ``RAW_IMU`` / ``SCALED_IMU2`` messages. + attitude: Sequence of ``(ts_ns, roll_rad, pitch_rad, yaw_rad)`` + tuples sourced from ``ATTITUDE`` messages. + imu_count_by_type: Map of message-type-name → count, used for + the ``"tlog missing required message types: [...]"`` + error path (R-DEMO-3). + """ + + accel: tuple[tuple[int, float], ...] + attitude: tuple[tuple[int, float, float, float], ...] + imu_count_by_type: dict[str, int] + + +# --------------------------------------------------------------------- +# Public entrypoints + + +def detect_tlog_takeoff( + tlog_path: Path, + target_fc_dialect: FcKind, + config: AutoSyncConfig, + *, + source_factory: Callable[[str], Any] | None = None, +) -> _DetectorResult: + """Walk the tlog head, detect the take-off pattern, return result. + + Args: + tlog_path: Path to the tlog file. Existence is checked at + entry. + target_fc_dialect: ``ARDUPILOT_PLANE`` or ``INAV``. Both speak + ``ardupilotmega`` MAVLink on the GCS telemetry channel + (the iNav-side native MSP traffic is irrelevant here); + this parameter is accepted for parity with the rest of + the replay surface and is also used in the missing- + messages error to name the dialect explicitly. + config: Operator-tunable thresholds (see + :class:`AutoSyncConfig`). + source_factory: Test-only injection — when provided, replaces + the pymavlink open call with the factory's return value. + The factory must yield an object with ``recv_match`` / + ``close`` semantics matching pymavlink's + ``mavutil.mavlink_connection``. + + Raises: + ReplayInputAdapterError: When the tlog is missing + ``RAW_IMU`` / ``SCALED_IMU2`` (no IMU samples) or + ``ATTITUDE`` (no attitude samples). This is the R-DEMO-3 + fail-fast path — it surfaces BEFORE any video read in the + coordinator's ``open()`` flow. + """ + if target_fc_dialect not in (FcKind.ARDUPILOT_PLANE, FcKind.INAV): + raise ReplayInputAdapterError( + f"target_fc_dialect must be ARDUPILOT_PLANE or INAV; got {target_fc_dialect!r}" + ) + if not tlog_path.is_file(): + raise ReplayInputAdapterError(f"tlog file not found: {tlog_path}") + samples = _load_tlog_samples( + tlog_path, + config.prescan_max_messages, + source_factory=source_factory, + ) + return _compute_tlog_takeoff_from_samples(samples, config) + + +def detect_video_motion_onset( + video_path: Path, + config: AutoSyncConfig, + *, + frames_factory: Callable[[Path, float], Iterable[tuple[int, "np.ndarray"]]] + | None = None, +) -> _DetectorResult: + """Scan the leading video segment, detect motion onset, return result. + + Args: + video_path: Path to an MP4 / MKV / AVI file. + config: Operator-tunable thresholds (see + :class:`AutoSyncConfig`). + frames_factory: Test-only injection — when provided, returns + a synthetic iterable of ``(monotonic_ns, frame_bgr)`` + tuples. Must yield at least 2 frames for the pairwise + optical-flow magnitudes to compute. + + Raises: + ReplayInputAdapterError: When the video file is missing or + unreadable, or fewer than 2 frames are decoded. + """ + if not video_path.is_file(): + raise ReplayInputAdapterError(f"video file not found: {video_path}") + if frames_factory is None: + frames = list(_read_video_frames(video_path, config.video_motion_scan_seconds)) + else: + frames = list(frames_factory(video_path, config.video_motion_scan_seconds)) + if len(frames) < 2: + raise ReplayInputAdapterError( + f"video file unreadable or too short: {video_path} " + f"(decoded {len(frames)} frame(s); need ≥ 2)" + ) + flow_samples = _compute_flow_magnitudes(frames) + return _compute_video_onset_from_samples(flow_samples, config) + + +def compute_offset( + tlog_result: _DetectorResult, + video_result: _DetectorResult, +) -> AutoSyncDecision: + """Combine tlog + video detector outputs into an :class:`AutoSyncDecision`. + + Offset semantics (positive = video starts before take-off recorded + in tlog): ``offset_ns = tlog_takeoff_ns - video_motion_onset_ns``. + Combined confidence = ``min(tlog_confidence, video_confidence)`` — + the weakest signal dominates so downstream WARN-and-proceed (AC-6) + fires whenever either side is unreliable. + """ + offset_ns = tlog_result.onset_ns - video_result.onset_ns + combined = min(tlog_result.confidence, video_result.confidence) + return AutoSyncDecision( + offset_ms=offset_ns // 1_000_000, + tlog_takeoff_ns=tlog_result.onset_ns, + video_motion_onset_ns=video_result.onset_ns, + tlog_confidence=tlog_result.confidence, + video_confidence=video_result.confidence, + combined_confidence=combined, + ) + + +def validate_offset_or_fail( + offset_ms: int, + tlog_imu_timestamps_ns: Iterable[int], + video_frame_timestamps_ns: Iterable[int], + threshold_pct: float, + *, + window_ms: int = 100, +) -> int: + """AC-9 frame-window match validator. + + Returns ``0`` when ≥ ``threshold_pct`` % of video frames have an + IMU sample within ± ``window_ms`` after the offset is applied; + returns ``2`` otherwise (CLI exit code for AC-8 hard-fail). + + The check is symmetric in offset sign — the offset is added to + each video timestamp and the nearest tlog IMU timestamp is then + looked up by binary search. + """ + video_list = list(video_frame_timestamps_ns) + if not video_list: + # Degenerate input — no frames to match. The replay binary + # rejects empty videos earlier, so reaching this branch + # would be a bug; return 2 so the operator sees the hard-fail + # rather than a false PASS. + return 2 + tlog_sorted = sorted(tlog_imu_timestamps_ns) + if not tlog_sorted: + return 2 + offset_ns = int(offset_ms) * 1_000_000 + window_ns = int(window_ms) * 1_000_000 + matched = 0 + for vts in video_list: + target_ns = vts + offset_ns + idx = bisect.bisect_left(tlog_sorted, target_ns) + # The nearest IMU sample is whichever of the immediate + # neighbours of `target_ns` is closer. Either may be out of + # range at the ends of the array. + nearest: int | None = None + for j in (idx - 1, idx): + if 0 <= j < len(tlog_sorted): + cand = tlog_sorted[j] + if nearest is None or abs(cand - target_ns) < abs(nearest - target_ns): + nearest = cand + if nearest is not None and abs(nearest - target_ns) <= window_ns: + matched += 1 + match_pct = (matched / len(video_list)) * 100.0 + return 0 if match_pct >= threshold_pct else 2 + + +# --------------------------------------------------------------------- +# Pure compute kernels (testable without disk IO) + + +def _compute_tlog_takeoff_from_samples( + samples: TlogSamples, + config: AutoSyncConfig, +) -> _DetectorResult: + """Pure detector: turn pre-loaded tlog samples into a result. + + Algorithm: find the first sustained-window where (a) accel + magnitude excess above 1 g exceeds the threshold for at least + ``sustained_seconds``, and (b) attitude-rate magnitude exceeds + its threshold sustained over the same duration. Combined + confidence = ``min(accel_ratio, attitude_ratio)`` — both + signals must agree for a high-confidence take-off. + + Raises: + ReplayInputAdapterError: When the tlog had no IMU samples or + no ATTITUDE samples (R-DEMO-3 fail-fast). + """ + if not samples.accel: + missing = ["RAW_IMU", "SCALED_IMU2"] + raise ReplayInputAdapterError( + f"tlog missing required message types: {missing}" + ) + if not samples.attitude: + raise ReplayInputAdapterError( + "tlog missing required message types: ['ATTITUDE']" + ) + + sustained_ns = int(config.sustained_seconds * 1_000_000_000) + + # Pair-wise attitude rates (rad/s magnitude vector) — emitted at + # the timestamp of the LATER sample so the rate aligns with when + # it is observable downstream. + attitude_rates: list[tuple[int, float]] = [] + for i in range(1, len(samples.attitude)): + ts_prev, roll_prev, pitch_prev, yaw_prev = samples.attitude[i - 1] + ts_curr, roll_curr, pitch_curr, yaw_curr = samples.attitude[i] + dt_s = (ts_curr - ts_prev) / 1_000_000_000.0 + if dt_s <= 0.0: + continue + dr = roll_curr - roll_prev + dp = pitch_curr - pitch_prev + dy = _wrap_pi(yaw_curr - yaw_prev) + rate_mag = math.sqrt((dr / dt_s) ** 2 + (dp / dt_s) ** 2 + (dy / dt_s) ** 2) + attitude_rates.append((ts_curr, rate_mag)) + + accel_excess = tuple( + (ts, abs(total_g - _REST_TOTAL_G)) for ts, total_g in samples.accel + ) + + accel_event = _find_sustained_event( + accel_excess, + threshold=config.takeoff_accel_threshold_g, + sustained_ns=sustained_ns, + ) + attitude_event = _find_sustained_event( + tuple(attitude_rates), + threshold=config.takeoff_attitude_rate_threshold_rad_s, + sustained_ns=sustained_ns, + ) + + if accel_event is None and attitude_event is None: + # Neither signal crossed; best we can do is flag "no clear + # take-off" so the coordinator can WARN and continue with the + # tlog start as a fallback origin. + first_ns = samples.accel[0][0] + return _DetectorResult(onset_ns=first_ns, confidence=0.0) + + if accel_event is not None and attitude_event is not None: + # Both signals fired — they should both point at the same + # event. We adopt the EARLIER of the two onsets so the offset + # is referenced against the moment thrust began (the attitude + # body-rate spike usually trails the thrust by a few hundred + # ms during a vertical climb). + onset_ns = min(accel_event[0], attitude_event[0]) + # Confidence is the weakest of the two signals, scaled by + # how cleanly they agree. We keep it simple: min(). + confidence = min(accel_event[1], attitude_event[1]) + elif accel_event is not None: + # Only the accel signal — discount confidence so the + # combined offset eventually trips the WARN-and-proceed + # threshold (combined_confidence < 0.80 → AC-6). + onset_ns, raw_conf = accel_event + confidence = raw_conf * 0.6 + else: + # Only attitude rate — same rationale as above. The + # mypy-narrowing else covers attitude_event is not None. + assert attitude_event is not None + onset_ns, raw_conf = attitude_event + confidence = raw_conf * 0.6 + + return _DetectorResult(onset_ns=onset_ns, confidence=confidence) + + +def _compute_video_onset_from_samples( + flow_samples: tuple[tuple[int, float], ...], + config: AutoSyncConfig, +) -> _DetectorResult: + """Pure detector: turn pre-computed optical-flow magnitudes into a result. + + Algorithm: find the first sustained window where the flow + magnitude exceeds the configured threshold for at least + ``sustained_seconds``. Confidence = sustained ratio. + """ + if not flow_samples: + return _DetectorResult(onset_ns=0, confidence=0.0) + sustained_ns = int(config.sustained_seconds * 1_000_000_000) + event = _find_sustained_event( + flow_samples, + threshold=config.video_motion_threshold, + sustained_ns=sustained_ns, + ) + if event is None: + return _DetectorResult(onset_ns=flow_samples[0][0], confidence=0.0) + onset_ns, confidence = event + return _DetectorResult(onset_ns=onset_ns, confidence=confidence) + + +def _find_sustained_event( + samples: tuple[tuple[int, float], ...] | list[tuple[int, float]], + *, + threshold: float, + sustained_ns: int, +) -> tuple[int, float] | None: + """Sliding-window scan: return ``(start_ns, ratio)`` of the + earliest window where the fraction of samples above + ``threshold`` is maximised, provided that fraction is ≥ 0.5 + (signal-vs-noise floor) and the window covers at least 80 % of + ``sustained_ns`` (guards against truncated windows at the tail). + + Returns ``None`` when no qualifying window exists. + """ + seq = list(samples) + n = len(seq) + if n < 2: + return None + best_start_ns: int | None = None + best_ratio = 0.0 + min_window_ns = int(sustained_ns * 0.8) + for i in range(n): + start_ns = seq[i][0] + end_ns = start_ns + sustained_ns + # Walk j forward while still inside the window. + j = i + above = 0 + while j < n and seq[j][0] <= end_ns: + if seq[j][1] > threshold: + above += 1 + j += 1 + window_size = j - i + if window_size < 2: + continue + window_dur_ns = seq[j - 1][0] - start_ns + if window_dur_ns < min_window_ns: + continue + ratio = above / window_size + if ratio > best_ratio: + best_ratio = ratio + best_start_ns = start_ns + if best_start_ns is None or best_ratio < 0.5: + return None + return (best_start_ns, best_ratio) + + +def _wrap_pi(angle_rad: float) -> float: + """Wrap an angle delta into ``(-π, π]`` to handle yaw wrap-around.""" + twopi = 2.0 * math.pi + a = angle_rad % twopi + if a > math.pi: + a -= twopi + return a + + +# --------------------------------------------------------------------- +# Disk-reading wrappers (production paths) + + +_REQUIRED_TLOG_TYPES: tuple[str, ...] = ( + "RAW_IMU", + "SCALED_IMU2", + "ATTITUDE", +) + + +def _load_tlog_samples( + tlog_path: Path, + max_messages: int, + *, + source_factory: Callable[[str], Any] | None, +) -> TlogSamples: + """Stream the tlog head, capture IMU + ATTITUDE samples. + + Mirrors the AZ-399 source-factory test pattern: production builds + use ``pymavlink`` lazily; tests pass an in-memory fake. + """ + source = _open_tlog(tlog_path, source_factory=source_factory) + accel: list[tuple[int, float]] = [] + attitude: list[tuple[int, float, float, float]] = [] + counts: dict[str, int] = {} + try: + for _ in range(max_messages): + try: + msg = source.recv_match( + type=list(_REQUIRED_TLOG_TYPES), + blocking=False, + ) + except Exception as exc: # pragma: no cover — defensive. + raise ReplayInputAdapterError( + f"tlog scan failed on {tlog_path}: {exc!r}" + ) from exc + if msg is None: + break + msg_type = _safe_msg_type(msg) + if not msg_type: + continue + counts[msg_type] = counts.get(msg_type, 0) + 1 + ts_ns = _msg_timestamp_ns(msg) + if msg_type in ("RAW_IMU", "SCALED_IMU2"): + xa = float(getattr(msg, "xacc", 0.0)) / _MG_PER_G + ya = float(getattr(msg, "yacc", 0.0)) / _MG_PER_G + za = float(getattr(msg, "zacc", 0.0)) / _MG_PER_G + total_g = math.sqrt(xa * xa + ya * ya + za * za) + accel.append((ts_ns, total_g)) + elif msg_type == "ATTITUDE": + roll = float(getattr(msg, "roll", 0.0)) + pitch = float(getattr(msg, "pitch", 0.0)) + yaw = float(getattr(msg, "yaw", 0.0)) + attitude.append((ts_ns, roll, pitch, yaw)) + finally: + if hasattr(source, "close"): + try: + source.close() + except Exception: # pragma: no cover — defensive. + pass + return TlogSamples( + accel=tuple(accel), + attitude=tuple(attitude), + imu_count_by_type=counts, + ) + + +def _open_tlog( + tlog_path: Path, + *, + source_factory: Callable[[str], Any] | None, +) -> Any: + if source_factory is not None: + return source_factory(str(tlog_path)) + try: + from pymavlink import mavutil # type: ignore[import-not-found] + except ImportError as exc: + raise ReplayInputAdapterError( + "pymavlink is required for replay auto-sync but is not " + "importable in this binary" + ) from exc + return mavutil.mavlink_connection( + str(tlog_path), + dialect="ardupilotmega", + mavlink_version="2.0", + ) + + +def _safe_msg_type(msg: Any) -> str: + try: + if hasattr(msg, "get_type"): + return str(msg.get_type()) + except Exception: + return "" + return type(msg).__name__ + + +def _msg_timestamp_ns(msg: Any) -> int: + raw = getattr(msg, "_timestamp", None) + if raw is None: + raise ReplayInputAdapterError( + "tlog message missing _timestamp attribute; pymavlink " + "mavlogfile should populate it on every recv_match() return" + ) + return int(float(raw) * 1_000_000_000) + + +def _read_video_frames( + video_path: Path, + scan_seconds: float, +) -> Iterable[tuple[int, "np.ndarray"]]: + """Decode the leading ``scan_seconds`` of the video. + + Yields ``(monotonic_ns, frame_bgr)`` tuples where ``monotonic_ns`` + is the file's per-frame ``CAP_PROP_POS_MSEC × 1e6`` so the + returned timestamps align with what + :class:`VideoFileFrameSource` will report later. The Python + ``time.monotonic_ns()`` is NOT used — the auto-sync result has to + be deterministic across runs (AC-10) and tied to the video + timeline. + """ + try: + import cv2 as _cv2 # type: ignore[import-not-found] + except ImportError as exc: + raise ReplayInputAdapterError( + "opencv-python is required for replay auto-sync but is " + "not importable in this binary" + ) from exc + capture = _cv2.VideoCapture(str(video_path)) + if not capture.isOpened(): + capture.release() + raise ReplayInputAdapterError( + f"video file unreadable / unsupported codec: {video_path}" + ) + try: + max_pos_ms = scan_seconds * 1000.0 + while True: + ok, frame = capture.read() + if not ok or frame is None: + break + pos_ms = float(capture.get(_cv2.CAP_PROP_POS_MSEC)) + if pos_ms > max_pos_ms: + break + ts_ns = int(pos_ms * 1_000_000) + yield ts_ns, frame + finally: + capture.release() + + +def _compute_flow_magnitudes( + frames: list[tuple[int, "np.ndarray"]], +) -> tuple[tuple[int, float], ...]: + """Pairwise mean optical-flow magnitude between consecutive frames. + + Uses Farneback dense flow (``cv2.calcOpticalFlowFarneback``) + rather than pyramidal LK because Farneback returns a flow field + over the whole image with no per-frame feature-tracking state, so + the result is deterministic given the same input frames (AC-10). + + Returns ``((ts_ns_of_second_frame, mean_magnitude_px), ...)``. + """ + try: + import cv2 as _cv2 # type: ignore[import-not-found] + import numpy as _np # type: ignore[import-not-found] + except ImportError as exc: # pragma: no cover — guarded at call sites. + raise ReplayInputAdapterError( + "opencv-python + numpy are required for replay auto-sync" + ) from exc + if len(frames) < 2: + return () + # Convert all frames to grayscale once up-front so the per-pair + # cost is dominated by the optical-flow computation itself. + gray_frames = [] + for ts_ns, frame in frames: + gray = _cv2.cvtColor(frame, _cv2.COLOR_BGR2GRAY) + gray_frames.append((ts_ns, gray)) + out: list[tuple[int, float]] = [] + for i in range(1, len(gray_frames)): + prev_ts, prev = gray_frames[i - 1] + curr_ts, curr = gray_frames[i] + flow = _cv2.calcOpticalFlowFarneback( + prev, + curr, + None, + pyr_scale=0.5, + levels=3, + winsize=15, + iterations=3, + poly_n=5, + poly_sigma=1.2, + flags=0, + ) + # ``flow`` shape: (H, W, 2) — dx + dy per pixel. + magnitudes = _np.sqrt(flow[..., 0] ** 2 + flow[..., 1] ** 2) + mean_mag = float(magnitudes.mean()) + out.append((curr_ts, mean_mag)) + return tuple(out) + + +# Re-export the BUILD-flag check for symmetry with other replay modules. +def _build_flag_on(name: str) -> bool: + raw = os.environ.get(name, "") + return raw.strip().lower() in {"on", "1", "true", "yes"} diff --git a/src/gps_denied_onboard/replay_input/errors.py b/src/gps_denied_onboard/replay_input/errors.py new file mode 100644 index 0000000..2576974 --- /dev/null +++ b/src/gps_denied_onboard/replay_input/errors.py @@ -0,0 +1,38 @@ +"""``replay_input/`` error taxonomy (AZ-405 / E-DEMO-REPLAY). + +The coordinator surfaces a single error class so the shared main can +map every coordinator-scope failure to CLI exit code 2 (per epic +AZ-265 AC-8 and the v2.0.0 replay protocol). The class is a subclass +of :class:`RuntimeError` to keep stdlib-style ``except RuntimeError`` +catch sites (composition root) covering it without explicit imports. + +Translation rule: ``ReplayInputAdapter.open()`` re-raises strategy-side +exceptions — :class:`FcOpenError`, :class:`FrameSourceConfigError`, +:class:`FrameSourceError` — as :class:`ReplayInputAdapterError` after +re-shaping the message into the contract-mandated form (e.g. ``"tlog +missing required message types: [...]"``). The original is chained as +``__cause__`` so debug logs retain the underlying detail. +""" + +from __future__ import annotations + +__all__ = ["ReplayInputAdapterError"] + + +class ReplayInputAdapterError(RuntimeError): + """Base class for every :class:`ReplayInputAdapter` failure. + + Concrete failure modes (per epic AZ-265 + replay protocol v2.0.0): + + - ``"tlog missing required message types: [...]"`` — R-DEMO-3 + fail-fast at startup; raised from inside ``open()`` BEFORE the + video is read so a malformed tlog does not hang on + :class:`cv2.VideoCapture` initialisation. + - ``"auto-sync hard-fail: ..."`` — AC-8 frame-window match + violation; the resolved offset (auto OR manual) failed the + ≥ 95 % match threshold. + - ``"video file unreadable / unsupported codec / ..."`` — surfaced + from :class:`FrameSourceConfigError` raised by + :class:`VideoFileFrameSource` at coordinator scope so the CLI's + exit-code mapping stays single-source. + """ diff --git a/src/gps_denied_onboard/replay_input/interface.py b/src/gps_denied_onboard/replay_input/interface.py new file mode 100644 index 0000000..96a287e --- /dev/null +++ b/src/gps_denied_onboard/replay_input/interface.py @@ -0,0 +1,145 @@ +"""``replay_input/`` DTOs (AZ-405 / E-DEMO-REPLAY). + +Frozen + slotted dataclasses per ADR-002 / module-layout.md so the +composition root and the coordinator can pass these by value without +fear of mutation downstream. + +The DTOs come in two flavours: + +- :class:`AutoSyncConfig` — operator-tunable thresholds for the + auto-sync algorithm. The composition root builds an instance from + ``config.replay.auto_sync`` (owned by AZ-269 / AZ-270) and passes + it to :class:`ReplayInputAdapter`. Defaults match the contract + in :mod:`auto_sync` and the AC-1 / AC-2 / AC-3 thresholds. +- :class:`AutoSyncDecision` — the outcome of one auto-sync run. The + composition root attaches this to the FDR record so an operator can + audit how the offset was resolved. +- :class:`ReplayInputBundle` — the trio of strategies the composition + root consumes after :meth:`ReplayInputAdapter.open` returns. The + bundle also carries the resolved offset so the FDR write at the + start of the replay run can record provenance. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from gps_denied_onboard._types.fc import FcKind # noqa: F401 # for docstrings. + from gps_denied_onboard.clock import Clock + from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import ( + TlogReplayFcAdapter, + ) + from gps_denied_onboard.frame_source.video_file import VideoFileFrameSource + + +__all__ = [ + "AutoSyncConfig", + "AutoSyncDecision", + "ReplayInputBundle", +] + + +@dataclass(frozen=True, slots=True) +class AutoSyncConfig: + """Operator-tunable thresholds for the AZ-405 auto-sync algorithm. + + Defaults match the contract in + ``_docs/02_document/contracts/replay/replay_protocol.md`` v2.0.0 + and the AC-1 / AC-2 / AC-3 thresholds in the AZ-405 spec. + + Attributes: + takeoff_accel_threshold_g: Sustained vertical-acceleration + magnitude (in g) above which a tlog sample is considered + part of a take-off pattern. Default 0.5 (AC-1). + takeoff_attitude_rate_threshold_rad_s: Sustained attitude-rate + magnitude (rad/s) above which an ``ATTITUDE`` pair is + considered part of a take-off pattern. Default 1.0. + sustained_seconds: Minimum duration both signals must persist + above their thresholds for a candidate to be accepted. + Default 0.5. + prescan_max_messages: Upper bound on tlog messages walked by + the take-off detector. ~30 s of telemetry at 200 Hz = + 6000 messages, matching the AZ-399 pre-scan budget. + video_motion_threshold: Mean optical-flow magnitude (pixels) + above which a video frame pair is considered ``moving``. + Default 1.5 (calibrated for 720p footage). + video_motion_scan_seconds: Length of the leading video segment + inspected for the motion onset. Default 10.0 (AC-4 covers + an onset at frame 11 of a 60-frame fixture). + match_threshold_pct: AC-9 frame-window match-percentage + threshold (default 95.0). Configurable per + ``config.replay.auto_sync_match_threshold_pct``. + match_window_ms: AC-9 per-frame matching tolerance in + milliseconds (default 100). + low_confidence_threshold: Combined-confidence cut-off below + which :meth:`ReplayInputAdapter.open` logs WARN and uses + the best-guess offset (AC-6). Default 0.80. + """ + + takeoff_accel_threshold_g: float = 0.5 + takeoff_attitude_rate_threshold_rad_s: float = 1.0 + sustained_seconds: float = 0.5 + prescan_max_messages: int = 6000 + video_motion_threshold: float = 1.5 + video_motion_scan_seconds: float = 10.0 + match_threshold_pct: float = 95.0 + match_window_ms: int = 100 + low_confidence_threshold: float = 0.80 + + +@dataclass(frozen=True, slots=True) +class AutoSyncDecision: + """Outcome of one auto-sync run (AZ-405). + + Attributes: + offset_ms: Resolved offset to be applied to tlog timestamps. + ``offset_ms = tlog_takeoff_ns - video_motion_onset_ns`` + converted to milliseconds. + tlog_takeoff_ns: Detected tlog take-off timestamp. + video_motion_onset_ns: Detected video motion-onset timestamp. + tlog_confidence: Take-off detector confidence in [0, 1]. + video_confidence: Motion-onset detector confidence in [0, 1]. + combined_confidence: Aggregated confidence in [0, 1]. Below + :attr:`AutoSyncConfig.low_confidence_threshold` the + coordinator logs WARN and proceeds (AC-6). + """ + + offset_ms: int + tlog_takeoff_ns: int + video_motion_onset_ns: int + tlog_confidence: float + video_confidence: float + combined_confidence: float + + +@dataclass(frozen=True, slots=True) +class ReplayInputBundle: + """Trio of strategies returned by :meth:`ReplayInputAdapter.open`. + + The composition root wires the bundle into the same C1–C7 + C13 + pipeline as live (replay protocol Invariant 1 — the components + see only the standard :class:`FrameSource` / :class:`FcAdapter` / + :class:`Clock` interfaces past this point). + + Attributes: + frame_source: :class:`VideoFileFrameSource` instance ready + for ``next_frame()`` calls. + fc_adapter: :class:`TlogReplayFcAdapter` instance with its + decode thread already started by :meth:`open`. + clock: :class:`TlogDerivedClock` (pace=ASAP) or + :class:`WallClock` (pace=REALTIME). + resolved_time_offset_ms: Offset applied to tlog timestamps. + Equals either the ``manual_time_offset_ms`` constructor + argument or :attr:`AutoSyncDecision.offset_ms`. + auto_sync_result: Auto-sync outcome; ``None`` when the + constructor received an explicit + ``manual_time_offset_ms``. + """ + + frame_source: "VideoFileFrameSource" + fc_adapter: "TlogReplayFcAdapter" + clock: "Clock" + resolved_time_offset_ms: int + auto_sync_result: AutoSyncDecision | None diff --git a/src/gps_denied_onboard/replay_input/tlog_video_adapter.py b/src/gps_denied_onboard/replay_input/tlog_video_adapter.py new file mode 100644 index 0000000..fcd78c4 --- /dev/null +++ b/src/gps_denied_onboard/replay_input/tlog_video_adapter.py @@ -0,0 +1,528 @@ +"""``ReplayInputAdapter`` (AZ-405 / E-DEMO-REPLAY). + +Layer-4 cross-cutting coordinator that converges ``(video, tlog)`` +inputs into the standard :class:`FrameSource`, :class:`FcAdapter`, +and :class:`Clock` surfaces consumed by the airborne composition +root. Owns the time-alignment concern: either the operator's manual +``--time-offset-ms`` override or the AZ-405 IMU-take-off auto-detect. + +``open()`` performs strict ordering so AC-13 holds: + +1. **Tlog message-type pre-validation** runs FIRST so a tlog missing + ``RAW_IMU`` / ``ATTITUDE`` raises before the video is ever read. +2. If the constructor received ``manual_time_offset_ms is None``, + the auto-sync detectors run; otherwise the manual offset is + adopted directly (AC-8 verifies the bypass). +3. The resolved offset is fed through the AC-9 frame-window match + validator; a hard-fail raises ``"auto-sync hard-fail: …"`` so + the shared main maps it to CLI exit code 2 (AC-7). +4. The :class:`Clock` strategy is constructed (``TlogDerivedClock`` + for ``pace=ASAP``, ``WallClock`` for ``pace=REALTIME``) — the + single instance the bundle ships to the composition root + (Invariant 2; AC-5). +5. :class:`VideoFileFrameSource` and :class:`TlogReplayFcAdapter` + are constructed against the offset + clock + dialect; the FC + adapter's own ``open()`` triggers its independent pre-scan (a + second sanity check; the operator gets the original error path + if step 1 was bypassed via a test fake). +6. The bundle is returned with ``auto_sync_result`` populated for + the auto path and ``None`` for the manual path. + +The coordinator is idempotent on ``close()`` — repeated calls are +no-ops once the underlying strategies have been released (AC-12). +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from gps_denied_onboard._types.fc import FcKind +from gps_denied_onboard.clock.tlog_derived import TlogDerivedClock +from gps_denied_onboard.clock.wall_clock import WallClock +from gps_denied_onboard.components.c8_fc_adapter.errors import ( + FcAdapterConfigError, + FcAdapterError, + FcOpenError, +) +from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import ( + ReplayPace, + TlogReplayFcAdapter, +) +from gps_denied_onboard.fdr_client.records import FdrRecord +from gps_denied_onboard.frame_source.errors import ( + FrameSourceConfigError, + FrameSourceError, +) +from gps_denied_onboard.frame_source.video_file import VideoFileFrameSource +from gps_denied_onboard.helpers.iso_timestamps import iso_ts_now +from gps_denied_onboard.replay_input.auto_sync import ( + _load_tlog_samples, + compute_offset, + detect_video_motion_onset, + validate_offset_or_fail, +) +from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError +from gps_denied_onboard.replay_input.interface import ( + AutoSyncConfig, + AutoSyncDecision, + ReplayInputBundle, +) + +if TYPE_CHECKING: + from gps_denied_onboard._types.calibration import CameraCalibration + from gps_denied_onboard.clock import Clock + from gps_denied_onboard.fdr_client.client import FdrClient + from gps_denied_onboard.helpers.wgs_converter import WgsConverter + + +__all__ = ["ReplayInputAdapter"] + + +_FDR_PRODUCER_ID = "replay_input.tlog_video_adapter" + +_LOG_KIND_AUTO_SYNC_DETECTED = "replay.auto_sync.detected" +_LOG_KIND_AUTO_SYNC_LOW_CONF = "replay.auto_sync.low_confidence" +_LOG_KIND_AUTO_SYNC_AC8_FAIL = "replay.auto_sync.ac8_validation_failed" +_LOG_KIND_OPEN_MANUAL = "replay.input.opened_manual_offset" + + +class ReplayInputAdapter: + """Coordinator that converges ``(video, tlog)`` into the airborne strategies. + + Constructor parameters: + + - ``video_path`` / ``tlog_path`` — filesystem inputs. + - ``camera_calibration`` — :class:`CameraCalibration` used to + derive the calibration ID propagated into every emitted + :class:`NavCameraFrame`. + - ``target_fc_dialect`` — ``ARDUPILOT_PLANE`` or ``INAV``; + passed through to :class:`TlogReplayFcAdapter`. + - ``wgs_converter`` — shared geodesy helper, constructor-injected + into :class:`TlogReplayFcAdapter`. + - ``fdr_client`` — FDR sink for the TlogReplayFcAdapter and for + the coordinator's own structured-event mirror. + - ``pace`` — :class:`ReplayPace` (``ASAP`` or ``REALTIME``). + - ``manual_time_offset_ms`` — ``None`` triggers auto-sync; an + integer bypasses auto-sync entirely (AC-8). + - ``auto_sync_config`` — :class:`AutoSyncConfig` thresholds. + + Behaviour: + + - :meth:`open` resolves the offset, validates AC-9, and returns a + :class:`ReplayInputBundle` with the wired strategies. Raises + :class:`ReplayInputAdapterError` on every coordinator-scope + failure so the shared main can map cleanly to CLI exit code 2. + - :meth:`close` releases the FC adapter and the frame source; + idempotent (AC-12). + """ + + __slots__ = ( + "_video_path", + "_tlog_path", + "_camera_calibration", + "_target_fc_dialect", + "_wgs_converter", + "_fdr_client", + "_pace", + "_manual_time_offset_ms", + "_auto_sync_config", + "_tlog_source_factory", + "_video_frames_factory", + "_video_timestamps_factory", + "_log", + "_opened", + "_closed", + "_bundle", + ) + + def __init__( + self, + *, + video_path: Path, + tlog_path: Path, + camera_calibration: "CameraCalibration", + target_fc_dialect: FcKind, + wgs_converter: "WgsConverter", + fdr_client: "FdrClient", + pace: ReplayPace, + manual_time_offset_ms: int | None, + auto_sync_config: AutoSyncConfig, + tlog_source_factory: Any | None = None, + video_frames_factory: Any | None = None, + video_timestamps_factory: Any | None = None, + ) -> None: + if not isinstance(video_path, Path): + raise ReplayInputAdapterError( + f"video_path must be a pathlib.Path; got {type(video_path).__name__}" + ) + if not isinstance(tlog_path, Path): + raise ReplayInputAdapterError( + f"tlog_path must be a pathlib.Path; got {type(tlog_path).__name__}" + ) + if target_fc_dialect not in (FcKind.ARDUPILOT_PLANE, FcKind.INAV): + raise ReplayInputAdapterError( + f"target_fc_dialect must be ARDUPILOT_PLANE or INAV; " + f"got {target_fc_dialect!r}" + ) + if not isinstance(pace, ReplayPace): + raise ReplayInputAdapterError( + f"pace must be a ReplayPace enum; got {type(pace).__name__}" + ) + self._video_path = video_path + self._tlog_path = tlog_path + self._camera_calibration = camera_calibration + self._target_fc_dialect = target_fc_dialect + self._wgs_converter = wgs_converter + self._fdr_client = fdr_client + self._pace = pace + self._manual_time_offset_ms = manual_time_offset_ms + self._auto_sync_config = auto_sync_config + self._tlog_source_factory = tlog_source_factory + self._video_frames_factory = video_frames_factory + self._video_timestamps_factory = video_timestamps_factory + self._log = logging.getLogger("replay_input.tlog_video_adapter") + self._opened = False + self._closed = False + self._bundle: ReplayInputBundle | None = None + + def open(self) -> ReplayInputBundle: + """Resolve the offset, build the strategies, return the bundle. + + Idempotent only in the failure-then-retry sense — calling + ``open()`` twice without an intervening ``close()`` raises + :class:`ReplayInputAdapterError`. + """ + if self._opened: + raise ReplayInputAdapterError("ReplayInputAdapter already opened") + + # Step 1 — tlog presence + required-message check (R-DEMO-3, + # AC-13). Runs BEFORE any video read so a malformed tlog + # surfaces without paying the cv2.VideoCapture cost. + tlog_imu_timestamps_ns, tlog_samples_for_auto = self._load_and_validate_tlog() + + # Step 2 — resolve the offset (auto-sync or manual override). + decision: AutoSyncDecision | None + if self._manual_time_offset_ms is None: + decision = self._run_auto_sync(tlog_samples_for_auto) + resolved_offset_ms = decision.offset_ms + else: + decision = None + resolved_offset_ms = int(self._manual_time_offset_ms) + self._log.info( + f"{_LOG_KIND_OPEN_MANUAL}: resolved_offset_ms={resolved_offset_ms}", + extra={ + "kind": _LOG_KIND_OPEN_MANUAL, + "kv": {"resolved_offset_ms": resolved_offset_ms}, + }, + ) + + # Step 3 — load video frame timestamps and run AC-9 validator. + video_frame_timestamps_ns = self._load_video_timestamps() + result_code = validate_offset_or_fail( + resolved_offset_ms, + tlog_imu_timestamps_ns, + video_frame_timestamps_ns, + threshold_pct=self._auto_sync_config.match_threshold_pct, + window_ms=self._auto_sync_config.match_window_ms, + ) + if result_code != 0: + self._raise_ac8_fail( + resolved_offset_ms, + len(tlog_imu_timestamps_ns), + len(video_frame_timestamps_ns), + ) + + # Step 4 — clock strategy (single instance per Invariant 2). + clock = self._build_clock() + + # Step 5 — concrete strategies. The frame source is built + # first because its constructor verifies the build flag and + # opens the cv2 capture handle — a failure here is a clean + # config error (no resources held). The FC adapter is built + # second; its open() launches the decode thread. + try: + frame_source = VideoFileFrameSource( + path=self._video_path, + camera_calibration_id=self._camera_calibration.camera_id, + clock=clock, + ) + except FrameSourceConfigError as exc: + raise ReplayInputAdapterError( + f"video file unreadable / unsupported codec: {self._video_path} " + f"({exc})" + ) from exc + except FrameSourceError as exc: + raise ReplayInputAdapterError( + f"video file decode error: {self._video_path} ({exc})" + ) from exc + + try: + fc_adapter = TlogReplayFcAdapter( + tlog_path=self._tlog_path, + target_fc_dialect=self._target_fc_dialect, + clock=clock, + wgs_converter=self._wgs_converter, + fdr_client=self._fdr_client, + time_offset_ms=resolved_offset_ms, + pace=self._pace, + source_factory=self._tlog_source_factory, + ) + fc_adapter.open() + except (FcOpenError, FcAdapterConfigError, FcAdapterError) as exc: + # Release the already-built frame source so we do not + # leak the cv2 handle when the FC adapter fails after + # the video was opened. + try: + frame_source.close() + except Exception: # pragma: no cover — defensive. + self._log.debug( + "ReplayInputAdapter: frame_source.close() during FC adapter rollback failed", + exc_info=True, + ) + # Translate the FC error into the coordinator's single + # public failure shape so the CLI exit-code mapping + # remains single-source. Pre-scan failures naturally + # surface the "tlog missing required messages: …" prefix + # the contract mandates. + raise ReplayInputAdapterError(str(exc)) from exc + + # Step 6 — assemble + record the bundle. + bundle = ReplayInputBundle( + frame_source=frame_source, + fc_adapter=fc_adapter, + clock=clock, + resolved_time_offset_ms=resolved_offset_ms, + auto_sync_result=decision, + ) + self._bundle = bundle + self._opened = True + return bundle + + def close(self) -> None: + """Release the FC adapter + frame source; idempotent (AC-12).""" + if self._closed: + self._log.debug( + "ReplayInputAdapter.close called twice; no-op" + ) + return + self._closed = True + bundle = self._bundle + self._bundle = None + if bundle is None: + return + try: + bundle.fc_adapter.close() + except Exception: # pragma: no cover — defensive. + self._log.debug( + "ReplayInputAdapter: fc_adapter.close() raised", exc_info=True + ) + try: + bundle.frame_source.close() + except Exception: # pragma: no cover — defensive. + self._log.debug( + "ReplayInputAdapter: frame_source.close() raised", exc_info=True + ) + + # ------------------------------------------------------------------ + # Internal helpers + + def _load_and_validate_tlog( + self, + ) -> tuple[list[int], Any]: + """Load tlog IMU + ATTITUDE samples; raise on missing types. + + Returns the IMU-only timestamp list (used by the AC-9 + validator) plus the full :class:`TlogSamples` so the auto- + sync path can reuse the same scan for take-off detection. + Raises :class:`ReplayInputAdapterError` for the R-DEMO-3 + missing-types path; this is the AC-13 fail-fast surface. + """ + if not self._tlog_path.is_file(): + raise ReplayInputAdapterError( + f"tlog file not found: {self._tlog_path}" + ) + samples = _load_tlog_samples( + self._tlog_path, + self._auto_sync_config.prescan_max_messages, + source_factory=self._tlog_source_factory, + ) + if not samples.accel: + raise ReplayInputAdapterError( + "tlog missing required message types: ['RAW_IMU', 'SCALED_IMU2']" + ) + if not samples.attitude: + raise ReplayInputAdapterError( + "tlog missing required message types: ['ATTITUDE']" + ) + return [ts for ts, _ in samples.accel], samples + + def _run_auto_sync(self, tlog_samples: Any) -> AutoSyncDecision: + """Auto path — compute the take-off / motion-onset / offset. + + Re-uses the already-loaded ``tlog_samples`` for the take-off + detector so the tlog is walked exactly once per ``open()`` + regardless of which path runs. + """ + from gps_denied_onboard.replay_input.auto_sync import ( + _compute_tlog_takeoff_from_samples, + ) + + tlog_result = _compute_tlog_takeoff_from_samples( + tlog_samples, self._auto_sync_config + ) + video_result = detect_video_motion_onset( + self._video_path, + self._auto_sync_config, + frames_factory=self._video_frames_factory, + ) + decision = compute_offset(tlog_result, video_result) + if decision.combined_confidence < self._auto_sync_config.low_confidence_threshold: + self._log_decision( + kind=_LOG_KIND_AUTO_SYNC_LOW_CONF, + level="WARN", + decision=decision, + extra_kv={"proceeding_with_best_guess": True}, + ) + else: + self._log_decision( + kind=_LOG_KIND_AUTO_SYNC_DETECTED, + level="INFO", + decision=decision, + extra_kv={}, + ) + return decision + + def _load_video_timestamps(self) -> list[int]: + """Decode the leading video segment, return per-frame timestamps. + + Used by the AC-9 frame-window match validator and as a + fallback when the auto-sync video scan was bypassed (manual + path). Stops at ``video_motion_scan_seconds`` so wildly long + clips do not hold up startup. + """ + if self._video_timestamps_factory is not None: + return list(self._video_timestamps_factory(self._video_path)) + try: + import cv2 as _cv2 # type: ignore[import-not-found] + except ImportError as exc: + raise ReplayInputAdapterError( + "opencv-python is required for replay auto-sync but is " + "not importable in this binary" + ) from exc + capture = _cv2.VideoCapture(str(self._video_path)) + if not capture.isOpened(): + capture.release() + raise ReplayInputAdapterError( + f"video file unreadable / unsupported codec: {self._video_path}" + ) + out: list[int] = [] + max_pos_ms = self._auto_sync_config.video_motion_scan_seconds * 1000.0 + try: + while True: + ok = capture.grab() + if not ok: + break + pos_ms = float(capture.get(_cv2.CAP_PROP_POS_MSEC)) + if pos_ms > max_pos_ms: + break + out.append(int(pos_ms * 1_000_000)) + finally: + capture.release() + return out + + def _build_clock(self) -> "Clock": + """Pick the :class:`Clock` strategy per pace; single instance. + + The ``TlogDerivedClock`` is constructed against an empty + iterable here: the composition root (AZ-401) is responsible + for hooking the clock's source up to the live tlog cursor + once the FC adapter's decode thread starts streaming. The + empty-source default keeps unit tests self-contained. + """ + if self._pace is ReplayPace.ASAP: + return TlogDerivedClock(source=iter([])) + return WallClock() + + def _log_decision( + self, + *, + kind: str, + level: str, + decision: AutoSyncDecision, + extra_kv: dict[str, Any], + ) -> None: + kv: dict[str, Any] = { + "tlog_takeoff_ns": decision.tlog_takeoff_ns, + "video_motion_onset_ns": decision.video_motion_onset_ns, + "offset_ms": decision.offset_ms, + "tlog_confidence": decision.tlog_confidence, + "video_confidence": decision.video_confidence, + "combined_confidence": decision.combined_confidence, + } + kv.update(extra_kv) + msg = f"{kind}: offset_ms={decision.offset_ms} confidence={decision.combined_confidence:.3f}" + if level == "WARN": + self._log.warning(msg, extra={"kind": kind, "kv": kv}) + else: + self._log.info(msg, extra={"kind": kind, "kv": kv}) + self._emit_fdr_event(level=level, log_kind=kind, msg=msg, kv=kv) + + def _raise_ac8_fail( + self, + offset_ms: int, + imu_count: int, + frame_count: int, + ) -> None: + kv = { + "offset_ms": offset_ms, + "frame_window_match_pct_threshold": self._auto_sync_config.match_threshold_pct, + "imu_sample_count": imu_count, + "video_frame_count": frame_count, + } + msg = ( + f"auto-sync hard-fail: frame-window match below " + f"{self._auto_sync_config.match_threshold_pct}% with " + f"offset_ms={offset_ms}" + ) + self._log.error( + f"{_LOG_KIND_AUTO_SYNC_AC8_FAIL}: {msg}", + extra={"kind": _LOG_KIND_AUTO_SYNC_AC8_FAIL, "kv": kv}, + ) + self._emit_fdr_event( + level="ERROR", log_kind=_LOG_KIND_AUTO_SYNC_AC8_FAIL, msg=msg, kv=kv + ) + raise ReplayInputAdapterError(msg) + + def _emit_fdr_event( + self, + *, + level: str, + log_kind: str, + msg: str, + kv: dict[str, Any], + ) -> None: + record = FdrRecord( + schema_version=1, + ts=iso_ts_now(), + producer_id=_FDR_PRODUCER_ID, + kind="log", + payload={ + "level": level, + "component": "replay_input", + "kind": log_kind, + "msg": msg, + "kv": kv, + }, + ) + try: + self._fdr_client.enqueue(record) + except Exception as exc: + self._log.debug( + f"replay_input.fdr_enqueue_failed: {exc!r}", + extra={ + "kind": "replay_input.fdr_enqueue_failed", + "kv": {"error": repr(exc), "downstream_kind": log_kind}, + }, + ) \ No newline at end of file diff --git a/tests/unit/replay_input/__init__.py b/tests/unit/replay_input/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/replay_input/test_az405_auto_sync.py b/tests/unit/replay_input/test_az405_auto_sync.py new file mode 100644 index 0000000..d4f12c3 --- /dev/null +++ b/tests/unit/replay_input/test_az405_auto_sync.py @@ -0,0 +1,483 @@ +"""AZ-405 — auto-sync detector + offset-compute + AC-9 validator. + +Covers AC-1..AC-10 of ``_docs/02_tasks/todo/AZ-405_replay_auto_sync.md``. + +Tests run against the pure compute kernels in +:mod:`gps_denied_onboard.replay_input.auto_sync` (no disk IO, no real +pymavlink, no real OpenCV) so the suite is fast + deterministic. + +Style: every test follows the Arrange / Act / Assert pattern. +""" + +from __future__ import annotations + +import math +from typing import Any + +import pytest + +from gps_denied_onboard.replay_input.auto_sync import ( + TlogSamples, + _compute_tlog_takeoff_from_samples, + _compute_video_onset_from_samples, + compute_offset, + validate_offset_or_fail, +) +from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError +from gps_denied_onboard.replay_input.interface import AutoSyncConfig + + +# --------------------------------------------------------------------- +# Synthetic-fixture helpers + + +def _ns(seconds: float) -> int: + return int(seconds * 1_000_000_000) + + +def _flat_accel_samples( + *, start_s: float, end_s: float, hz: int, total_g: float +) -> list[tuple[int, float]]: + out: list[tuple[int, float]] = [] + period = 1.0 / hz + t = start_s + while t < end_s: + out.append((_ns(t), total_g)) + t += period + return out + + +def _flat_attitude_samples( + *, start_s: float, end_s: float, hz: int, roll: float, pitch: float, yaw: float +) -> list[tuple[int, float, float, float]]: + out: list[tuple[int, float, float, float]] = [] + period = 1.0 / hz + t = start_s + while t < end_s: + out.append((_ns(t), roll, pitch, yaw)) + t += period + return out + + +def _ramp_attitude_samples( + *, + start_s: float, + end_s: float, + hz: int, + base_roll: float, + base_pitch: float, + base_yaw: float, + rate_rad_s: float, +) -> list[tuple[int, float, float, float]]: + """Attitude that ramps in pitch at ``rate_rad_s`` rad/s.""" + out: list[tuple[int, float, float, float]] = [] + period = 1.0 / hz + t = start_s + while t < end_s: + dt = t - start_s + pitch = base_pitch + rate_rad_s * dt + out.append((_ns(t), base_roll, pitch, base_yaw)) + t += period + return out + + +def _build_takeoff_samples() -> TlogSamples: + """AC-1 fixture: 2 s flat hover, then 1.5 s sustained 2.2 g + 1.5 rad/s. + + Take-off onset is at t = 2.0 s (the first sample with the + elevated acceleration). Body-frame accelerometer convention: at + hover the proper-acceleration magnitude is 1 g (gravity reaction); + during a 1.2 g thrust climb it is 2.2 g, so the take-off excess + above 1 g rest is 1.2 g — well above the 0.5 g threshold. + """ + accel_pre = _flat_accel_samples(start_s=0.0, end_s=2.0, hz=200, total_g=1.0) + accel_post = _flat_accel_samples(start_s=2.0, end_s=3.5, hz=200, total_g=2.2) + accel = accel_pre + accel_post + + attitude_pre = _flat_attitude_samples( + start_s=0.0, end_s=2.0, hz=100, roll=0.0, pitch=0.0, yaw=0.0 + ) + attitude_post = _ramp_attitude_samples( + start_s=2.0, + end_s=3.5, + hz=100, + base_roll=0.0, + base_pitch=0.0, + base_yaw=0.0, + rate_rad_s=1.5, + ) + attitude = attitude_pre + attitude_post + + return TlogSamples( + accel=tuple(accel), + attitude=tuple(attitude), + imu_count_by_type={ + "RAW_IMU": len(accel), + "ATTITUDE": len(attitude), + }, + ) + + +def _build_low_amplitude_vibration_samples() -> TlogSamples: + """AC-2 fixture: 5 s of 0.3 g body-frame vibration (no take-off). + + Total proper-acceleration during vibration = 1.3 g (0.3 g excess + above the 1 g rest baseline) — strictly below the 0.5 g detector + threshold so the sustained-event search rejects every window. + """ + accel = _flat_accel_samples(start_s=0.0, end_s=5.0, hz=200, total_g=1.3) + attitude = _flat_attitude_samples( + start_s=0.0, end_s=5.0, hz=100, roll=0.0, pitch=0.0, yaw=0.0 + ) + return TlogSamples( + accel=tuple(accel), + attitude=tuple(attitude), + imu_count_by_type={ + "RAW_IMU": len(accel), + "ATTITUDE": len(attitude), + }, + ) + + +def _build_hand_launch_samples() -> TlogSamples: + """AC-3 fixture: 0.8 g impulse for 100 ms; not sustained for 0.5 s. + + Body-frame accelerometer convention (see ``_build_takeoff_samples``): + a 0.8 g impulse becomes 1.8 g total proper-acceleration during the + impulse window. + """ + accel_pre = _flat_accel_samples(start_s=0.0, end_s=2.0, hz=200, total_g=1.0) + accel_impulse = _flat_accel_samples( + start_s=2.0, end_s=2.1, hz=200, total_g=1.8 + ) + accel_post = _flat_accel_samples(start_s=2.1, end_s=3.0, hz=200, total_g=1.0) + accel = accel_pre + accel_impulse + accel_post + + attitude = _flat_attitude_samples( + start_s=0.0, end_s=3.0, hz=100, roll=0.0, pitch=0.0, yaw=0.0 + ) + return TlogSamples( + accel=tuple(accel), + attitude=tuple(attitude), + imu_count_by_type={ + "RAW_IMU": len(accel), + "ATTITUDE": len(attitude), + }, + ) + + +def _flow_samples_from_frames( + *, n_stationary: int, n_moving: int, fps: int = 30, motion_px: float = 4.0 +) -> tuple[tuple[int, float], ...]: + """Synthesise the flow-magnitude series the video detector consumes. + + The detector emits a ``(ts_ns, mean_magnitude_px)`` tuple for each + consecutive frame pair (skipping the first frame's pair). For + AC-4 we pretend frames 1..10 are stationary (mag ≈ 0) and frames + 11..60 are moving (mag = motion_px). + """ + out: list[tuple[int, float]] = [] + period_ns = int(1_000_000_000 / fps) + for i in range(1, n_stationary): + out.append((i * period_ns, 0.05)) + for j in range(n_stationary, n_stationary + n_moving): + out.append((j * period_ns, motion_px)) + return tuple(out) + + +# --------------------------------------------------------------------- +# AC-1 — tlog take-off detector (positive) + + +def test_ac1_tlog_takeoff_detector_positive_within_50ms_and_high_confidence() -> None: + # Arrange + config = AutoSyncConfig() + samples = _build_takeoff_samples() + + # Act + result = _compute_tlog_takeoff_from_samples(samples, config) + + # Assert + expected_onset_ns = _ns(2.0) + assert abs(result.onset_ns - expected_onset_ns) <= _ns(0.05), ( + f"detected onset {result.onset_ns / 1e9}s deviates >50ms from expected 2.0s" + ) + assert result.confidence >= 0.85, ( + f"confidence {result.confidence} below AC-1 minimum of 0.85" + ) + + +# --------------------------------------------------------------------- +# AC-2 — tlog take-off detector (ambiguous) + + +def test_ac2_tlog_takeoff_detector_low_amplitude_vibration_low_confidence() -> None: + # Arrange + config = AutoSyncConfig() + samples = _build_low_amplitude_vibration_samples() + + # Act + result = _compute_tlog_takeoff_from_samples(samples, config) + + # Assert + assert result.confidence < 0.50, ( + f"confidence {result.confidence} should be < 0.50 for ambiguous vibration" + ) + + +# --------------------------------------------------------------------- +# AC-3 — tlog take-off detector (hand launch) + + +def test_ac3_tlog_takeoff_detector_hand_launch_warn_regime() -> None: + # Arrange + config = AutoSyncConfig() + samples = _build_hand_launch_samples() + + # Act + result = _compute_tlog_takeoff_from_samples(samples, config) + + # Assert + assert result.confidence < 0.80, ( + f"confidence {result.confidence} should be < 0.80 for unsustained hand-launch" + ) + + +# --------------------------------------------------------------------- +# AC-4 — video motion-onset detector + + +def test_ac4_video_motion_onset_detected_within_one_frame() -> None: + # Arrange + config = AutoSyncConfig() + flow_samples = _flow_samples_from_frames(n_stationary=10, n_moving=50, fps=30) + period_ns = int(1_000_000_000 / 30) + expected_onset_ns = 10 * period_ns + + # Act + result = _compute_video_onset_from_samples(flow_samples, config) + + # Assert + assert abs(result.onset_ns - expected_onset_ns) <= period_ns, ( + f"detected motion onset {result.onset_ns} ns deviates >1 frame " + f"from expected {expected_onset_ns} ns" + ) + assert result.confidence > 0.80, ( + f"confidence {result.confidence} too low for clear motion onset" + ) + + +# --------------------------------------------------------------------- +# AC-5 — combined offset within ± 200 ms + + +def test_ac5_combined_offset_within_200ms_of_ground_truth() -> None: + # Arrange + config = AutoSyncConfig() + tlog_samples = _build_takeoff_samples() + tlog_result = _compute_tlog_takeoff_from_samples(tlog_samples, config) + + flow_samples = _flow_samples_from_frames(n_stationary=10, n_moving=50, fps=30) + video_result = _compute_video_onset_from_samples(flow_samples, config) + + # Ground-truth offset = tlog take-off (2.0 s) − video onset (10/30 s) + period_ns = int(1_000_000_000 / 30) + ground_truth_offset_ms = (_ns(2.0) - 10 * period_ns) // 1_000_000 + + # Act + decision = compute_offset(tlog_result, video_result) + + # Assert + assert abs(decision.offset_ms - ground_truth_offset_ms) <= 200, ( + f"offset {decision.offset_ms} ms deviates >200 ms from ground truth " + f"{ground_truth_offset_ms} ms" + ) + + +# --------------------------------------------------------------------- +# AC-6 — low combined confidence (verified via the coordinator test +# in test_az405_replay_input_adapter.py; here we only verify the +# combined-confidence aggregator picks min()) + + +def test_ac6_combined_confidence_takes_minimum_of_inputs() -> None: + # Arrange + from gps_denied_onboard.replay_input.auto_sync import _DetectorResult + + high = _DetectorResult(onset_ns=_ns(1.0), confidence=0.95) + low = _DetectorResult(onset_ns=_ns(2.0), confidence=0.50) + + # Act + decision = compute_offset(high, low) + + # Assert + assert decision.combined_confidence == pytest.approx(0.50) + assert decision.offset_ms == (_ns(1.0) - _ns(2.0)) // 1_000_000 + + +# --------------------------------------------------------------------- +# AC-7 — AC-9 validator hard-fail (the coordinator-level raise is +# covered in test_az405_replay_input_adapter.py) + + +def test_ac7_validator_hard_fail_returns_2_for_offset_outside_window() -> None: + # Arrange + fps = 30 + period_ns = int(1_000_000_000 / fps) + video_ts = [i * period_ns for i in range(60)] + # IMU sampled at 200 Hz from t=0 to t=2 (mismatch deliberate; the + # bad offset shifts everything outside the window). + imu_ts = [int(i / 200 * 1_000_000_000) for i in range(400)] + bad_offset_ms = 60_000 + + # Act + code = validate_offset_or_fail( + bad_offset_ms, + imu_ts, + video_ts, + threshold_pct=95.0, + window_ms=100, + ) + + # Assert + assert code == 2 + + +# --------------------------------------------------------------------- +# AC-9 — frame-window match-percentage validator (positive) + + +def test_ac9_validator_passes_for_well_matched_offset() -> None: + # Arrange + fps = 30 + period_ns = int(1_000_000_000 / fps) + video_ts = [i * period_ns for i in range(60)] + # IMU samples densely spanning the same time range — every video + # frame has an IMU sample within ± 100 ms. + imu_ts = [int(i / 200 * 1_000_000_000) for i in range(60 * 200 // 30)] + + # Act + code = validate_offset_or_fail( + 0, imu_ts, video_ts, threshold_pct=95.0, window_ms=100 + ) + + # Assert + assert code == 0 + + +def test_ac9_threshold_configurable() -> None: + # Arrange — set up a series where exactly 80% of frames match. + fps = 30 + period_ns = int(1_000_000_000 / fps) + video_ts = [i * period_ns for i in range(50)] + # IMU only covers the first 80% of the video timeline; the last + # 10 frames will be far outside the window. + imu_ts = [ + int(i / 200 * 1_000_000_000) for i in range(int(40 / 30 * 200)) + ] + + # Act / Assert + # Default 95% threshold → fail (80% < 95%). + assert validate_offset_or_fail( + 0, imu_ts, video_ts, threshold_pct=95.0, window_ms=100 + ) == 2 + # Lowered to 75% → pass. + assert validate_offset_or_fail( + 0, imu_ts, video_ts, threshold_pct=75.0, window_ms=100 + ) == 0 + + +# --------------------------------------------------------------------- +# AC-10 — confidence determinism + + +def test_ac10_confidence_score_deterministic_across_two_runs() -> None: + # Arrange + config = AutoSyncConfig() + samples = _build_takeoff_samples() + + # Act + first = _compute_tlog_takeoff_from_samples(samples, config) + second = _compute_tlog_takeoff_from_samples(samples, config) + + # Assert + assert first.onset_ns == second.onset_ns + assert math.isclose(first.confidence, second.confidence, abs_tol=1e-9) + + +def test_ac10_video_onset_deterministic_across_two_runs() -> None: + # Arrange + config = AutoSyncConfig() + flow_samples = _flow_samples_from_frames(n_stationary=5, n_moving=20, fps=30) + + # Act + first = _compute_video_onset_from_samples(flow_samples, config) + second = _compute_video_onset_from_samples(flow_samples, config) + + # Assert + assert first.onset_ns == second.onset_ns + assert math.isclose(first.confidence, second.confidence, abs_tol=1e-9) + + +# --------------------------------------------------------------------- +# R-DEMO-3 fail-fast on the pure compute path + + +def test_pure_takeoff_kernel_raises_on_no_imu_samples() -> None: + # Arrange + config = AutoSyncConfig() + samples = TlogSamples( + accel=(), + attitude=(), + imu_count_by_type={"ATTITUDE": 100}, + ) + + # Act / Assert + with pytest.raises(ReplayInputAdapterError, match="tlog missing required"): + _compute_takeoff_or_propagate(samples, config) + + +def test_pure_takeoff_kernel_raises_on_no_attitude_samples() -> None: + # Arrange + config = AutoSyncConfig() + accel = _flat_accel_samples(start_s=0.0, end_s=1.0, hz=200, total_g=1.0) + samples = TlogSamples( + accel=tuple(accel), + attitude=(), + imu_count_by_type={"RAW_IMU": len(accel)}, + ) + + # Act / Assert + with pytest.raises(ReplayInputAdapterError, match="tlog missing required"): + _compute_takeoff_or_propagate(samples, config) + + +def _compute_takeoff_or_propagate(samples: TlogSamples, config: AutoSyncConfig) -> Any: + """Local trampoline so the assertions are explicit even if the + underscore-named helper migrates.""" + return _compute_tlog_takeoff_from_samples(samples, config) + + +# --------------------------------------------------------------------- +# AC-9 edge cases + + +def test_validator_returns_2_on_empty_video_or_tlog() -> None: + # Arrange + imu_ts = [0, 1_000_000, 2_000_000] + video_ts: list[int] = [] + + # Act / Assert — empty video + assert ( + validate_offset_or_fail( + 0, imu_ts, video_ts, threshold_pct=95.0, window_ms=100 + ) + == 2 + ) + # Empty tlog + assert ( + validate_offset_or_fail( + 0, [], [0, 1_000_000], threshold_pct=95.0, window_ms=100 + ) + == 2 + ) diff --git a/tests/unit/replay_input/test_az405_replay_input_adapter.py b/tests/unit/replay_input/test_az405_replay_input_adapter.py new file mode 100644 index 0000000..f1bad9b --- /dev/null +++ b/tests/unit/replay_input/test_az405_replay_input_adapter.py @@ -0,0 +1,729 @@ +"""AZ-405 — ``ReplayInputAdapter`` coordinator unit tests. + +Covers AC-6 (low-confidence WARN-and-proceed), AC-7 (AC-8 hard-fail), +AC-8 (manual override bypass), AC-11 (open() returns a complete +bundle), AC-12 (idempotent close), and AC-13 (R-DEMO-3 fail-fast on +missing tlog message types). + +Synthetic videos use the same OpenCV-driven fixture pattern as +``tests/unit/frame_source/test_protocol_conformance.py``; the tlog +side is faked via the coordinator's ``tlog_source_factory`` injection +point so tests run without a real pymavlink connection. + +Style: every test follows the Arrange / Act / Assert pattern. +""" + +from __future__ import annotations + +from pathlib import Path +from types import SimpleNamespace +from typing import Any +from unittest import mock + +import cv2 +import numpy as np +import pytest + +from gps_denied_onboard._types.calibration import CameraCalibration +from gps_denied_onboard._types.fc import FcKind +from gps_denied_onboard.clock.tlog_derived import TlogDerivedClock +from gps_denied_onboard.clock.wall_clock import WallClock +from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import ( + ReplayPace, + TlogReplayFcAdapter, +) +from gps_denied_onboard.frame_source.video_file import VideoFileFrameSource +from gps_denied_onboard.replay_input.errors import ReplayInputAdapterError +from gps_denied_onboard.replay_input.interface import ( + AutoSyncConfig, + AutoSyncDecision, + ReplayInputBundle, +) +from gps_denied_onboard.replay_input.tlog_video_adapter import ReplayInputAdapter + + +# --------------------------------------------------------------------- +# Fixtures + + +@pytest.fixture(autouse=True) +def _enable_build_flags(monkeypatch: pytest.MonkeyPatch) -> None: + """Both downstream strategies are gated by build flags (AZ-398 / AZ-399).""" + monkeypatch.setenv("BUILD_VIDEO_FILE_FRAME_SOURCE", "ON") + monkeypatch.setenv("BUILD_TLOG_REPLAY_ADAPTER", "ON") + + +@pytest.fixture +def fake_fdr_client() -> mock.MagicMock: + return mock.MagicMock(name="FdrClient") + + +@pytest.fixture +def fake_wgs_converter() -> mock.MagicMock: + return mock.MagicMock(name="WgsConverter") + + +@pytest.fixture +def camera_calibration() -> CameraCalibration: + return CameraCalibration( + camera_id="az405-test", + intrinsics_3x3=None, + distortion=None, + body_to_camera_se3=None, + acquisition_method="synthetic", + ) + + +def _make_synthetic_video(path: Path, n_frames: int = 60, fps: int = 30) -> Path: + """Write a 64×48 BGR MP4V file at ``path`` and return it.""" + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + writer = cv2.VideoWriter(str(path), fourcc, fps, (64, 48)) + if not writer.isOpened(): + raise RuntimeError(f"OpenCV could not open writer at {path!s}") + try: + for i in range(n_frames): + frame = np.full((48, 64, 3), i % 256, dtype=np.uint8) + writer.write(frame) + finally: + writer.release() + return path + + +@pytest.fixture +def synthetic_video(tmp_path: Path) -> Path: + return _make_synthetic_video(tmp_path / "az405-video.mp4", n_frames=60, fps=30) + + +@pytest.fixture +def synthetic_tlog_path(tmp_path: Path) -> Path: + p = tmp_path / "az405.tlog" + p.write_bytes(b"fake-tlog") + return p + + +# --------------------------------------------------------------------- +# Fake pymavlink source + + +def _ns(seconds: float) -> int: + return int(seconds * 1_000_000_000) + + +def _fake_msg(msg_type: str, *, ts_s: float, **fields: Any) -> SimpleNamespace: + ns = SimpleNamespace(_timestamp=ts_s, **fields) + ns.get_type = lambda: msg_type + return ns + + +def _build_takeoff_messages( + *, + accel_pre_total_g: float = 1.0, + accel_post_total_g: float = 2.2, + accel_hz: int = 200, + pre_seconds: float = 2.0, + post_seconds: float = 1.5, +) -> list[SimpleNamespace]: + """A short tlog stream with a clear take-off pattern + GPS + heartbeat.""" + out: list[SimpleNamespace] = [] + accel_period = 1.0 / accel_hz + # Pre-takeoff: 1 g hover (z-acc = -1g in body, after sign). + t = 0.0 + while t < pre_seconds: + out.append( + _fake_msg( + "RAW_IMU", + ts_s=t, + xacc=0, + yacc=0, + zacc=-int(accel_pre_total_g * 1000), + xgyro=0, + ygyro=0, + zgyro=0, + ) + ) + t += accel_period + # Post-takeoff: 2.2 g sustained climb thrust. + while t < pre_seconds + post_seconds: + out.append( + _fake_msg( + "RAW_IMU", + ts_s=t, + xacc=0, + yacc=0, + zacc=-int(accel_post_total_g * 1000), + xgyro=0, + ygyro=0, + zgyro=0, + ) + ) + t += accel_period + + # Attitude: flat hover then 1.5 rad/s pitch ramp. + t = 0.0 + attitude_period = 1.0 / 100.0 + while t < pre_seconds: + out.append( + _fake_msg("ATTITUDE", ts_s=t, roll=0.0, pitch=0.0, yaw=0.0) + ) + t += attitude_period + pitch_rate = 1.5 + while t < pre_seconds + post_seconds: + dt = t - pre_seconds + out.append( + _fake_msg( + "ATTITUDE", + ts_s=t, + roll=0.0, + pitch=pitch_rate * dt, + yaw=0.0, + ) + ) + t += attitude_period + + # GPS_RAW_INT + HEARTBEAT (required by AZ-399 pre-scan). + out.append( + _fake_msg( + "GPS_RAW_INT", + ts_s=0.0, + fix_type=3, + lat=499910000, + lon=362210000, + alt=153_400, + ) + ) + out.append(_fake_msg("HEARTBEAT", ts_s=0.0, system_status=4, base_mode=0)) + out.sort(key=lambda m: m._timestamp) + return out + + +class _FakeTlog: + """Minimal pymavlink ``mavlink_connection`` stand-in. + + Returns each stored message once on ``recv_match``; ignores the + ``type=`` filter (the AZ-399 decode loop receives unfiltered + HEARTBEAT/IMU/ATTITUDE/GPS streams). + """ + + def __init__(self, messages: list[SimpleNamespace]) -> None: + self._iter = iter(messages) + self.closed = False + + def recv_match(self, **_kwargs: Any) -> SimpleNamespace | None: + return next(self._iter, None) + + def close(self) -> None: + self.closed = True + + +def _factory_for(messages: list[SimpleNamespace]) -> Any: + """Return a source factory that yields a fresh ``_FakeTlog`` per call. + + The coordinator opens the tlog twice (once for ``_load_tlog_samples`` + in the auto-sync path, once via the FC adapter's pre-scan + decode + handles), so the messages have to be re-emittable. + """ + + def _factory(_path: str) -> _FakeTlog: + return _FakeTlog(list(messages)) + + return _factory + + +def _frames_factory_with_motion( + *, + n_stationary: int = 10, + n_moving: int = 50, + fps: int = 30, +) -> Any: + """Return a frames_factory yielding the AC-4 motion-onset shape.""" + period_ns = int(1_000_000_000 / fps) + rng = np.random.default_rng(seed=0) + + def _factory(_path: Path, _scan_seconds: float) -> Any: + out: list[tuple[int, np.ndarray]] = [] + # Stationary: identical frames so optical flow ≈ 0. + base = np.full((48, 64, 3), 128, dtype=np.uint8) + for i in range(n_stationary): + out.append((i * period_ns, base.copy())) + # Moving: each frame replaces a 16×16 patch at a random offset + # so Farneback returns a clear non-zero magnitude. Determinism + # is preserved by the seeded RNG. + for j in range(n_moving): + frame = base.copy() + r = rng.integers(0, 32) + c = rng.integers(0, 48) + frame[r : r + 16, c : c + 16, :] = 240 + out.append(((n_stationary + j) * period_ns, frame)) + return out + + return _factory + + +def _video_timestamps_factory( + *, + n_frames: int = 60, + fps: int = 30, +) -> Any: + """Return a timestamps_factory with deterministic per-frame ts (ns).""" + period_ns = int(1_000_000_000 / fps) + + def _factory(_path: Path) -> list[int]: + return [i * period_ns for i in range(n_frames)] + + return _factory + + +# --------------------------------------------------------------------- +# AC-11 — open() returns a complete bundle + + +def test_ac11_open_returns_complete_bundle_with_correct_strategies( + synthetic_video: Path, + synthetic_tlog_path: Path, + camera_calibration: CameraCalibration, + fake_wgs_converter: mock.MagicMock, + fake_fdr_client: mock.MagicMock, +) -> None: + # Arrange + messages = _build_takeoff_messages() + adapter = ReplayInputAdapter( + video_path=synthetic_video, + tlog_path=synthetic_tlog_path, + camera_calibration=camera_calibration, + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + wgs_converter=fake_wgs_converter, + fdr_client=fake_fdr_client, + pace=ReplayPace.ASAP, + manual_time_offset_ms=0, + auto_sync_config=AutoSyncConfig(), + tlog_source_factory=_factory_for(messages), + video_timestamps_factory=_video_timestamps_factory(), + ) + + # Act + try: + bundle = adapter.open() + + # Assert + assert isinstance(bundle, ReplayInputBundle) + assert isinstance(bundle.frame_source, VideoFileFrameSource) + assert isinstance(bundle.fc_adapter, TlogReplayFcAdapter) + assert isinstance(bundle.clock, TlogDerivedClock) + assert bundle.resolved_time_offset_ms == 0 + # Manual path → no auto-sync result. + assert bundle.auto_sync_result is None + finally: + adapter.close() + + +def test_ac11_pace_realtime_yields_wall_clock( + synthetic_video: Path, + synthetic_tlog_path: Path, + camera_calibration: CameraCalibration, + fake_wgs_converter: mock.MagicMock, + fake_fdr_client: mock.MagicMock, +) -> None: + # Arrange + messages = _build_takeoff_messages() + adapter = ReplayInputAdapter( + video_path=synthetic_video, + tlog_path=synthetic_tlog_path, + camera_calibration=camera_calibration, + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + wgs_converter=fake_wgs_converter, + fdr_client=fake_fdr_client, + pace=ReplayPace.REALTIME, + manual_time_offset_ms=0, + auto_sync_config=AutoSyncConfig(), + tlog_source_factory=_factory_for(messages), + video_timestamps_factory=_video_timestamps_factory(), + ) + + # Act + try: + bundle = adapter.open() + + # Assert + assert isinstance(bundle.clock, WallClock) + finally: + adapter.close() + + +def test_ac11_pace_asap_yields_tlog_derived_clock( + synthetic_video: Path, + synthetic_tlog_path: Path, + camera_calibration: CameraCalibration, + fake_wgs_converter: mock.MagicMock, + fake_fdr_client: mock.MagicMock, +) -> None: + # Arrange + messages = _build_takeoff_messages() + adapter = ReplayInputAdapter( + video_path=synthetic_video, + tlog_path=synthetic_tlog_path, + camera_calibration=camera_calibration, + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + wgs_converter=fake_wgs_converter, + fdr_client=fake_fdr_client, + pace=ReplayPace.ASAP, + manual_time_offset_ms=0, + auto_sync_config=AutoSyncConfig(), + tlog_source_factory=_factory_for(messages), + video_timestamps_factory=_video_timestamps_factory(), + ) + + # Act + try: + bundle = adapter.open() + + # Assert + assert isinstance(bundle.clock, TlogDerivedClock) + finally: + adapter.close() + + +# --------------------------------------------------------------------- +# AC-12 — idempotent close + + +def test_ac12_close_is_idempotent( + synthetic_video: Path, + synthetic_tlog_path: Path, + camera_calibration: CameraCalibration, + fake_wgs_converter: mock.MagicMock, + fake_fdr_client: mock.MagicMock, +) -> None: + # Arrange + messages = _build_takeoff_messages() + adapter = ReplayInputAdapter( + video_path=synthetic_video, + tlog_path=synthetic_tlog_path, + camera_calibration=camera_calibration, + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + wgs_converter=fake_wgs_converter, + fdr_client=fake_fdr_client, + pace=ReplayPace.ASAP, + manual_time_offset_ms=0, + auto_sync_config=AutoSyncConfig(), + tlog_source_factory=_factory_for(messages), + video_timestamps_factory=_video_timestamps_factory(), + ) + adapter.open() + + # Act / Assert — both calls must complete without raising. + adapter.close() + adapter.close() + + +def test_close_without_open_does_not_raise( + synthetic_video: Path, + synthetic_tlog_path: Path, + camera_calibration: CameraCalibration, + fake_wgs_converter: mock.MagicMock, + fake_fdr_client: mock.MagicMock, +) -> None: + # Arrange + adapter = ReplayInputAdapter( + video_path=synthetic_video, + tlog_path=synthetic_tlog_path, + camera_calibration=camera_calibration, + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + wgs_converter=fake_wgs_converter, + fdr_client=fake_fdr_client, + pace=ReplayPace.ASAP, + manual_time_offset_ms=0, + auto_sync_config=AutoSyncConfig(), + ) + + # Act / Assert + adapter.close() + + +# --------------------------------------------------------------------- +# AC-13 — missing tlog messages fail fast + + +def test_ac13_missing_imu_messages_fails_fast_before_video_read( + synthetic_video: Path, + synthetic_tlog_path: Path, + camera_calibration: CameraCalibration, + fake_wgs_converter: mock.MagicMock, + fake_fdr_client: mock.MagicMock, +) -> None: + # Arrange — tlog has only ATTITUDE; no RAW_IMU / SCALED_IMU2. + attitude_only = [ + _fake_msg("ATTITUDE", ts_s=t * 0.01, roll=0.0, pitch=0.0, yaw=0.0) + for t in range(100) + ] + adapter = ReplayInputAdapter( + video_path=synthetic_video, + tlog_path=synthetic_tlog_path, + camera_calibration=camera_calibration, + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + wgs_converter=fake_wgs_converter, + fdr_client=fake_fdr_client, + pace=ReplayPace.ASAP, + manual_time_offset_ms=0, + auto_sync_config=AutoSyncConfig(), + tlog_source_factory=_factory_for(attitude_only), + ) + + # Act / Assert + with pytest.raises( + ReplayInputAdapterError, match="tlog missing required message types" + ): + adapter.open() + + +def test_ac13_missing_attitude_messages_fails_fast( + synthetic_video: Path, + synthetic_tlog_path: Path, + camera_calibration: CameraCalibration, + fake_wgs_converter: mock.MagicMock, + fake_fdr_client: mock.MagicMock, +) -> None: + # Arrange — tlog has only RAW_IMU; no ATTITUDE. + imu_only = [ + _fake_msg( + "RAW_IMU", + ts_s=t * 0.005, + xacc=0, + yacc=0, + zacc=-1000, + xgyro=0, + ygyro=0, + zgyro=0, + ) + for t in range(100) + ] + adapter = ReplayInputAdapter( + video_path=synthetic_video, + tlog_path=synthetic_tlog_path, + camera_calibration=camera_calibration, + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + wgs_converter=fake_wgs_converter, + fdr_client=fake_fdr_client, + pace=ReplayPace.ASAP, + manual_time_offset_ms=0, + auto_sync_config=AutoSyncConfig(), + tlog_source_factory=_factory_for(imu_only), + ) + + # Act / Assert + with pytest.raises( + ReplayInputAdapterError, match=r"tlog missing required message types.*ATTITUDE" + ): + adapter.open() + + +# --------------------------------------------------------------------- +# AC-8 — manual override bypasses auto-detect + + +def test_ac8_manual_override_bypasses_auto_detect( + synthetic_video: Path, + synthetic_tlog_path: Path, + camera_calibration: CameraCalibration, + fake_wgs_converter: mock.MagicMock, + fake_fdr_client: mock.MagicMock, + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange + detect_calls: list[Any] = [] + + def _explode_if_called(*args: Any, **kwargs: Any) -> Any: + detect_calls.append((args, kwargs)) + raise AssertionError( + "auto-sync detector called even though manual_time_offset_ms was set" + ) + + monkeypatch.setattr( + "gps_denied_onboard.replay_input.tlog_video_adapter.detect_video_motion_onset", + _explode_if_called, + ) + + # Patch the take-off compute kernel referenced via the helper; the + # coordinator's manual path must skip it entirely. + monkeypatch.setattr( + "gps_denied_onboard.replay_input.auto_sync._compute_tlog_takeoff_from_samples", + _explode_if_called, + ) + + messages = _build_takeoff_messages() + adapter = ReplayInputAdapter( + video_path=synthetic_video, + tlog_path=synthetic_tlog_path, + camera_calibration=camera_calibration, + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + wgs_converter=fake_wgs_converter, + fdr_client=fake_fdr_client, + pace=ReplayPace.ASAP, + manual_time_offset_ms=500, + auto_sync_config=AutoSyncConfig(), + tlog_source_factory=_factory_for(messages), + video_timestamps_factory=_video_timestamps_factory(), + ) + + # Act + try: + bundle = adapter.open() + + # Assert — detector helpers were NOT invoked. + assert detect_calls == [] + assert bundle.resolved_time_offset_ms == 500 + assert bundle.auto_sync_result is None + finally: + adapter.close() + + +# --------------------------------------------------------------------- +# AC-7 — AC-8 hard-fail raises + + +def test_ac7_ac8_validator_hard_fail_raises_on_open( + synthetic_video: Path, + synthetic_tlog_path: Path, + camera_calibration: CameraCalibration, + fake_wgs_converter: mock.MagicMock, + fake_fdr_client: mock.MagicMock, +) -> None: + # Arrange — manual offset of 60 s will push every video frame + # outside the IMU coverage window (the fake tlog only carries + # ~3.5 s of samples). + messages = _build_takeoff_messages() + adapter = ReplayInputAdapter( + video_path=synthetic_video, + tlog_path=synthetic_tlog_path, + camera_calibration=camera_calibration, + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + wgs_converter=fake_wgs_converter, + fdr_client=fake_fdr_client, + pace=ReplayPace.ASAP, + manual_time_offset_ms=60_000, + auto_sync_config=AutoSyncConfig(), + tlog_source_factory=_factory_for(messages), + video_timestamps_factory=_video_timestamps_factory(), + ) + + # Act / Assert + with pytest.raises(ReplayInputAdapterError, match="auto-sync hard-fail"): + adapter.open() + + +# --------------------------------------------------------------------- +# AC-6 — low combined confidence WARN-and-proceed + + +def test_ac6_low_confidence_warn_and_proceed_does_not_raise( + synthetic_video: Path, + synthetic_tlog_path: Path, + camera_calibration: CameraCalibration, + fake_wgs_converter: mock.MagicMock, + fake_fdr_client: mock.MagicMock, + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + # Arrange — stub the detectors to return low-confidence results. + from gps_denied_onboard.replay_input.auto_sync import _DetectorResult + + low_conf = _DetectorResult(onset_ns=_ns(2.0), confidence=0.40) + + def _stub_take_off(*args: Any, **kwargs: Any) -> _DetectorResult: + return low_conf + + def _stub_motion_onset(*args: Any, **kwargs: Any) -> _DetectorResult: + return _DetectorResult(onset_ns=_ns(2.0), confidence=0.40) + + monkeypatch.setattr( + "gps_denied_onboard.replay_input.auto_sync._compute_tlog_takeoff_from_samples", + _stub_take_off, + ) + monkeypatch.setattr( + "gps_denied_onboard.replay_input.tlog_video_adapter.detect_video_motion_onset", + _stub_motion_onset, + ) + + messages = _build_takeoff_messages() + adapter = ReplayInputAdapter( + video_path=synthetic_video, + tlog_path=synthetic_tlog_path, + camera_calibration=camera_calibration, + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + wgs_converter=fake_wgs_converter, + fdr_client=fake_fdr_client, + pace=ReplayPace.ASAP, + manual_time_offset_ms=None, + auto_sync_config=AutoSyncConfig(), + tlog_source_factory=_factory_for(messages), + video_timestamps_factory=_video_timestamps_factory(), + ) + + # Act + caplog.set_level("WARNING", logger="replay_input.tlog_video_adapter") + try: + bundle = adapter.open() + + # Assert — open() returned the bundle (didn't raise) and the + # WARN log fired. + assert bundle.auto_sync_result is not None + assert bundle.auto_sync_result.combined_confidence == pytest.approx(0.40) + warn_kinds = [ + r.kind for r in caplog.records if hasattr(r, "kind") + ] + assert "replay.auto_sync.low_confidence" in warn_kinds + finally: + adapter.close() + + +def test_ac11_resolved_offset_matches_auto_sync_result( + synthetic_video: Path, + synthetic_tlog_path: Path, + camera_calibration: CameraCalibration, + fake_wgs_converter: mock.MagicMock, + fake_fdr_client: mock.MagicMock, + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Arrange — high-confidence stubs so AC-6 WARN does not fire. + from gps_denied_onboard.replay_input.auto_sync import _DetectorResult + + def _stub_take_off(*args: Any, **kwargs: Any) -> _DetectorResult: + return _DetectorResult(onset_ns=_ns(2.0), confidence=0.95) + + def _stub_motion_onset(*args: Any, **kwargs: Any) -> _DetectorResult: + return _DetectorResult(onset_ns=_ns(0.333), confidence=0.95) + + monkeypatch.setattr( + "gps_denied_onboard.replay_input.auto_sync._compute_tlog_takeoff_from_samples", + _stub_take_off, + ) + monkeypatch.setattr( + "gps_denied_onboard.replay_input.tlog_video_adapter.detect_video_motion_onset", + _stub_motion_onset, + ) + + messages = _build_takeoff_messages() + adapter = ReplayInputAdapter( + video_path=synthetic_video, + tlog_path=synthetic_tlog_path, + camera_calibration=camera_calibration, + target_fc_dialect=FcKind.ARDUPILOT_PLANE, + wgs_converter=fake_wgs_converter, + fdr_client=fake_fdr_client, + pace=ReplayPace.ASAP, + manual_time_offset_ms=None, + auto_sync_config=AutoSyncConfig(), + tlog_source_factory=_factory_for(messages), + video_timestamps_factory=_video_timestamps_factory(), + ) + + # Act + try: + bundle = adapter.open() + + # Assert + expected_offset_ms = (_ns(2.0) - _ns(0.333)) // 1_000_000 + assert bundle.resolved_time_offset_ms == expected_offset_ms + assert bundle.auto_sync_result is not None + assert bundle.auto_sync_result.offset_ms == expected_offset_ms + finally: + adapter.close()