mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 10:31:13 +00:00
087f4dba27
Co-authored-by: Cursor <cursoragent@cursor.com>
74 lines
2.3 KiB
Python
74 lines
2.3 KiB
Python
from shared.contracts import FramePacket, TelemetrySample
|
|
from vio_adapter import LocalVioAdapter, VioInputPacket
|
|
|
|
|
|
def _frame(**overrides: object) -> FramePacket:
|
|
payload: dict[str, object] = {
|
|
"frame_id": "frame-1",
|
|
"timestamp_ns": 1_000_000,
|
|
"image_ref": "replay/frame-1.jpg",
|
|
"calibration_id": "calib-1",
|
|
"occlusion": "clear",
|
|
"quality": 0.85,
|
|
}
|
|
payload.update(overrides)
|
|
return FramePacket.model_validate(payload)
|
|
|
|
|
|
def _telemetry(timestamp_ns: int = 1_000_000) -> TelemetrySample:
|
|
return TelemetrySample(
|
|
timestamp_ns=timestamp_ns,
|
|
imu={"accel_x": 0.1, "accel_y": 0.0, "accel_z": 9.8},
|
|
attitude={"roll": 0.0, "pitch": 0.01, "yaw": 0.02},
|
|
altitude_m=120.0,
|
|
airspeed_mps=24.0,
|
|
gps_health="lost",
|
|
)
|
|
|
|
|
|
def test_valid_synchronized_packet_emits_vio_state() -> None:
|
|
# Arrange
|
|
adapter = LocalVioAdapter()
|
|
packet = VioInputPacket(frame=_frame(), telemetry_samples=(_telemetry(),))
|
|
|
|
# Act
|
|
result = adapter.process(packet)
|
|
|
|
# Assert
|
|
assert result.error is None
|
|
assert result.state_packet is not None
|
|
assert result.state_packet.timestamp_ns == 1_000_000
|
|
assert result.state_packet.tracking_quality == 0.85
|
|
assert result.health.state == "ready"
|
|
|
|
|
|
def test_timestamp_mismatch_is_explicit_validation_error() -> None:
|
|
# Arrange
|
|
adapter = LocalVioAdapter(timestamp_tolerance_ns=1_000)
|
|
packet = VioInputPacket(frame=_frame(), telemetry_samples=(_telemetry(2_000_000),))
|
|
|
|
# Act
|
|
result = adapter.process(packet)
|
|
|
|
# Assert
|
|
assert result.state_packet is None
|
|
assert result.error is not None
|
|
assert result.error.component == "vio_adapter"
|
|
assert result.error.cause == "gap_exceeded"
|
|
assert result.health.state == "degraded"
|
|
|
|
|
|
def test_tracking_loss_degrades_health_without_emitting_absolute_position() -> None:
|
|
# Arrange
|
|
adapter = LocalVioAdapter(degraded_quality_threshold=0.35)
|
|
packet = VioInputPacket(frame=_frame(quality=0.2), telemetry_samples=(_telemetry(),))
|
|
|
|
# Act
|
|
result = adapter.process(packet)
|
|
|
|
# Assert
|
|
assert result.state_packet is not None
|
|
assert result.health.state == "degraded"
|
|
assert "latitude_deg" not in result.state_packet.model_dump()
|
|
assert "longitude_deg" not in result.state_packet.model_dump()
|