from shared.contracts import FramePacket, TelemetrySample from vio_adapter import LocalVioAdapter, VioInputPacket def _frame(**overrides: object) -> FramePacket: payload: dict[str, object] = { "frame_id": "frame-1", "timestamp_ns": 1_000_000, "image_ref": "replay/frame-1.jpg", "calibration_id": "calib-1", "occlusion": "clear", "quality": 0.85, } payload.update(overrides) return FramePacket.model_validate(payload) def _telemetry(timestamp_ns: int = 1_000_000) -> TelemetrySample: return TelemetrySample( timestamp_ns=timestamp_ns, imu={"accel_x": 0.1, "accel_y": 0.0, "accel_z": 9.8}, attitude={"roll": 0.0, "pitch": 0.01, "yaw": 0.02}, altitude_m=120.0, airspeed_mps=24.0, gps_health="lost", ) def test_valid_synchronized_packet_emits_vio_state() -> None: # Arrange adapter = LocalVioAdapter() packet = VioInputPacket(frame=_frame(), telemetry_samples=(_telemetry(),)) # Act result = adapter.process(packet) # Assert assert result.error is None assert result.state_packet is not None assert result.state_packet.timestamp_ns == 1_000_000 assert result.state_packet.tracking_quality == 0.85 assert result.health.state == "ready" def test_timestamp_mismatch_is_explicit_validation_error() -> None: # Arrange adapter = LocalVioAdapter(timestamp_tolerance_ns=1_000) packet = VioInputPacket(frame=_frame(), telemetry_samples=(_telemetry(2_000_000),)) # Act result = adapter.process(packet) # Assert assert result.state_packet is None assert result.error is not None assert result.error.component == "vio_adapter" assert result.error.cause == "gap_exceeded" assert result.health.state == "degraded" def test_tracking_loss_degrades_health_without_emitting_absolute_position() -> None: # Arrange adapter = LocalVioAdapter(degraded_quality_threshold=0.35) packet = VioInputPacket(frame=_frame(quality=0.2), telemetry_samples=(_telemetry(),)) # Act result = adapter.process(packet) # Assert assert result.state_packet is not None assert result.health.state == "degraded" assert "latitude_deg" not in result.state_packet.model_dump() assert "longitude_deg" not in result.state_packet.model_dump()