from pathlib import Path import pytest from e2e.replay.harness import ( BlackboxReplayRunner, ScenarioConfig, ScenarioGroup, ScenarioResult, validate_derkachi_alignment, ) from shared.contracts import FramePacket, TelemetrySample from vio_adapter import LocalVioAdapter, VioInputPacket def test_derkachi_alignment_validator_accepts_expected_fixture_shape() -> None: # Act metrics = validate_derkachi_alignment( video_duration_s=490.07, telemetry_duration_s=490.07, telemetry_rows=4_900, ) # Assert assert metrics["alignment_valid"] is True assert metrics["duration_delta_s"] == 0.0 assert metrics["frames_per_telemetry"] == pytest.approx(3.0, abs=0.05) def test_derkachi_alignment_validator_blocks_duration_drift() -> None: # Act / Assert with pytest.raises(ValueError, match="more than 250 ms"): validate_derkachi_alignment( video_duration_s=490.07, telemetry_duration_s=489.50, telemetry_rows=4_900, ) def test_public_vio_replay_boundary_emits_frame_by_frame_estimate() -> None: # Arrange adapter = LocalVioAdapter() frame = FramePacket( frame_id="derkachi-0001", timestamp_ns=1_000_000_000, image_ref="_docs/00_problem/input_data/flight_derkachi/flight_derkachi.mp4#0", calibration_id="derkachi-calibration-gated", occlusion="clear", quality=0.9, ) telemetry = ( TelemetrySample( timestamp_ns=1_000_000_000, imu={"accel_x": 0.0, "accel_y": 0.0, "accel_z": -9.8}, attitude={"roll": 0.0, "pitch": 0.0, "yaw": 1.0}, altitude_m=400.0, airspeed_mps=22.0, gps_health="healthy", ), ) # Act result = adapter.process(VioInputPacket(frame=frame, telemetry_samples=telemetry)) # Assert assert result.state_packet is not None assert result.health.state == "ready" assert result.state_packet.timestamp_ns == frame.timestamp_ns assert result.state_packet.tracking_quality > 0.0 def test_public_dataset_and_calibration_prerequisites_are_reported_blocked(tmp_path: Path) -> None: # Arrange scenario = ScenarioConfig( scenario_id="FT-P-03-CALIBRATION", name="Calibration-gated public VIO dataset", group=ScenarioGroup.PERFORMANCE, input_dataset="public_nadir_vio_candidates", required_paths=(tmp_path / "camera_intrinsics.yaml",), ) # Act result = BlackboxReplayRunner(output_root=tmp_path, scenarios=(scenario,)).run() # Assert report = result.reports[0] assert report.result == ScenarioResult.BLOCKED assert "camera_intrinsics.yaml" in report.error_message