mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 17:31:14 +00:00
94d2358c8b
Derkachi e2e Tier-2 divergence had three stacked root causes; this
commit ships fixes for all three plus the IMU prerequisite they
depend on, plus a baseline cheirality gate for cv2.recoverPose.
AZ-918 MAVLink IMU adapters now convert raw mG/mrad-s + FRD body to
SI m/s^2 + rad/s + FLU body via helpers.imu_units. Without
this the ESKF receives values ~1000x too small with wrong-
sign Y/Z and cannot function at all.
AZ-919 Composition root wires EskfNominalAltitudeProvider into the
KLT/RANSAC strategy via the AZ-331 factory introspect path;
OKVIS2 and VINS-Mono are unaffected.
AZ-920 KLT/RANSAC recovers metric translation via Ground Sampling
Distance when AGL is available; otherwise falls through with
scale_quality=direction_only/unknown (no fake scale invented).
AZ-921 VioOutput.scale_quality signal; ESKF add_vio adapts R_meas
position block based on the flag (1e6 inflation when scale is
direction_only/unknown to keep the filter consistent).
AZ-922 KLT/RANSAC cheirality gate rejects single-frame rotations
beyond a config threshold (default 30 deg), catching
cv2.recoverPose twisted-pair flips that cause immediate ESKF
divergence on low-parallax aerial scenes.
Verification:
- Tier-1 (macOS) unit suite: 2346 passed, 0 failed.
- Tier-2 (Jetson) Derkachi e2e: divergence moves from frame 5
(mahalanobis^2 3757) to frame 233 (mahalanobis^2 212). Remaining
drift is open-loop attitude accumulation, not cheirality.
Follow-up tickets filed:
- AZ-923 closed as misdiagnosed: EskfNominalAltitudeProvider was
already correct (nominal_pos.z IS the AGL when takeoff origin sits
at ground level); the early-frame AGL near zero reflects the drone
being stationary on the ground, not a provider bug.
- AZ-942 filed: cross-check VIO rotation against IMU preintegrator
(consistency gate) - more physically grounded than the coarse
AZ-922 threshold and likely required to absorb the frame-233 drift.
Co-authored-by: Cursor <cursoragent@cursor.com>
730 lines
20 KiB
Python
730 lines
20 KiB
Python
"""AZ-391 — C8 inbound subscription / decoder / rings / fan-out unit tests.
|
|
|
|
Covers all 10 ACs of AZ-391 + the named NFRs that can be verified
|
|
inside a unit test (the ≥ 200 Hz sustain budget belongs in C8-PT-01).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import threading
|
|
import time
|
|
from types import SimpleNamespace
|
|
from typing import Any
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
|
|
from gps_denied_onboard._types.fc import (
|
|
AttitudeSample,
|
|
FcTelemetryFrame,
|
|
FlightState,
|
|
FlightStateSignal,
|
|
GpsHealth,
|
|
GpsStatus,
|
|
ImuTelemetrySample,
|
|
TelemetryKind,
|
|
)
|
|
from gps_denied_onboard.components.c8_fc_adapter._inbound_mavlink import (
|
|
PymavlinkInboundDecoder,
|
|
)
|
|
from gps_denied_onboard.components.c8_fc_adapter._inbound_msp2 import (
|
|
Msp2InavInboundDecoder,
|
|
)
|
|
from gps_denied_onboard.components.c8_fc_adapter._subscription import (
|
|
SubscriptionBus,
|
|
)
|
|
from gps_denied_onboard.components.c8_fc_adapter._telemetry_rings import (
|
|
TelemetryRing,
|
|
)
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Helpers
|
|
|
|
|
|
def _ap_msg(msg_type: str, **fields: Any) -> Any:
|
|
"""Build a pymavlink-style stub message with `get_type()` + fields."""
|
|
ns = SimpleNamespace(**fields)
|
|
ns.get_type = lambda: msg_type
|
|
return ns
|
|
|
|
|
|
def _make_decoder(bus: SubscriptionBus | None = None) -> PymavlinkInboundDecoder:
|
|
if bus is None:
|
|
bus = SubscriptionBus()
|
|
src = mock.MagicMock()
|
|
return PymavlinkInboundDecoder(src, bus)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-1: AP RAW_IMU decode
|
|
|
|
|
|
def test_ac1_ap_raw_imu_decode() -> None:
|
|
# Arrange
|
|
bus = SubscriptionBus()
|
|
received: list[FcTelemetryFrame] = []
|
|
bus.subscribe(lambda f: received.append(f))
|
|
decoder = _make_decoder(bus)
|
|
msg = _ap_msg(
|
|
"RAW_IMU",
|
|
time_usec=12345,
|
|
xacc=10,
|
|
yacc=20,
|
|
zacc=-981,
|
|
xgyro=1,
|
|
ygyro=2,
|
|
zgyro=3,
|
|
)
|
|
|
|
# Act
|
|
dispatched = decoder.feed_one_message(msg)
|
|
|
|
# Assert
|
|
assert dispatched is True
|
|
assert len(received) == 1
|
|
frame = received[0]
|
|
assert frame.kind is TelemetryKind.IMU_SAMPLE
|
|
assert isinstance(frame.payload, ImuTelemetrySample)
|
|
# AZ-918: raw MAVLink (xacc=10, yacc=20, zacc=-981) mG FRD →
|
|
# SI/FLU m/s² via mavlink_imu_to_si_flu (negate Y, negate Z, *9.80665e-3).
|
|
# zacc raw is -981 mG; FRD→FLU negates Z, giving +981 mG = +9.62 m/s².
|
|
assert frame.payload.accel_xyz == pytest.approx((
|
|
10 * 9.80665e-3,
|
|
-20 * 9.80665e-3,
|
|
981 * 9.80665e-3,
|
|
))
|
|
assert frame.payload.gyro_xyz == pytest.approx((
|
|
1 * 1.0e-3,
|
|
-2 * 1.0e-3,
|
|
-3 * 1.0e-3,
|
|
))
|
|
# received_at is monotonic_ns at decode boundary (positive, non-zero)
|
|
assert frame.received_at > 0
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-2: AP ATTITUDE decode
|
|
|
|
|
|
def test_ac2_ap_attitude_decode() -> None:
|
|
# Arrange
|
|
bus = SubscriptionBus()
|
|
received: list[FcTelemetryFrame] = []
|
|
bus.subscribe(lambda f: received.append(f))
|
|
decoder = _make_decoder(bus)
|
|
msg = _ap_msg(
|
|
"ATTITUDE",
|
|
time_boot_ms=5000,
|
|
roll=0.1,
|
|
pitch=-0.2,
|
|
yaw=1.5,
|
|
rollspeed=0.0,
|
|
pitchspeed=0.0,
|
|
yawspeed=0.0,
|
|
)
|
|
|
|
# Act
|
|
decoder.feed_one_message(msg)
|
|
|
|
# Assert
|
|
assert len(received) == 1
|
|
assert received[0].kind is TelemetryKind.ATTITUDE
|
|
payload = received[0].payload
|
|
assert isinstance(payload, AttitudeSample)
|
|
assert payload.roll_rad == pytest.approx(0.1)
|
|
assert payload.pitch_rad == pytest.approx(-0.2)
|
|
assert payload.yaw_rad == pytest.approx(1.5)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-3: AP GPS_RAW_INT -> GpsHealth fix-type mapping
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"fix_type, expected_status",
|
|
[
|
|
(0, GpsStatus.NO_FIX),
|
|
(1, GpsStatus.NO_FIX),
|
|
(2, GpsStatus.DEGRADED),
|
|
(3, GpsStatus.STABLE),
|
|
(4, GpsStatus.STABLE), # DGPS
|
|
(5, GpsStatus.STABLE), # RTK_FLOAT
|
|
(6, GpsStatus.STABLE), # RTK_FIXED
|
|
],
|
|
)
|
|
def test_ac3_ap_gps_fix_type_mapping(fix_type: int, expected_status: GpsStatus) -> None:
|
|
# Arrange
|
|
bus = SubscriptionBus()
|
|
received: list[FcTelemetryFrame] = []
|
|
bus.subscribe(lambda f: received.append(f))
|
|
decoder = _make_decoder(bus)
|
|
msg = _ap_msg(
|
|
"GPS_RAW_INT",
|
|
time_usec=0,
|
|
fix_type=fix_type,
|
|
lat=505000000,
|
|
lon=300000000,
|
|
alt=100000,
|
|
eph=0,
|
|
epv=0,
|
|
vel=0,
|
|
cog=0,
|
|
satellites_visible=8,
|
|
alt_ellipsoid=0,
|
|
h_acc=0,
|
|
v_acc=0,
|
|
vel_acc=0,
|
|
hdg_acc=0,
|
|
yaw=0,
|
|
)
|
|
|
|
# Act
|
|
decoder.feed_one_message(msg)
|
|
|
|
# Assert
|
|
assert isinstance(received[0].payload, GpsHealth)
|
|
assert received[0].payload.status is expected_status
|
|
|
|
|
|
def test_ac3_spoofing_sentinel_promotes_to_spoofed() -> None:
|
|
# Arrange
|
|
bus = SubscriptionBus()
|
|
received: list[FcTelemetryFrame] = []
|
|
bus.subscribe(lambda f: received.append(f))
|
|
decoder = _make_decoder(bus)
|
|
statustext = _ap_msg("STATUSTEXT", severity=4, text="GPS spoofing detected!", id=0, chunk_seq=0)
|
|
gps_msg = _ap_msg(
|
|
"GPS_RAW_INT",
|
|
time_usec=0,
|
|
fix_type=3,
|
|
lat=505000000,
|
|
lon=300000000,
|
|
alt=100000,
|
|
eph=0,
|
|
epv=0,
|
|
vel=0,
|
|
cog=0,
|
|
satellites_visible=8,
|
|
alt_ellipsoid=0,
|
|
h_acc=0,
|
|
v_acc=0,
|
|
vel_acc=0,
|
|
hdg_acc=0,
|
|
yaw=0,
|
|
)
|
|
|
|
# Act
|
|
decoder.feed_one_message(statustext)
|
|
decoder.feed_one_message(gps_msg)
|
|
|
|
# Assert — the GPS frame after the sentinel is promoted to SPOOFED
|
|
gps_frames = [f for f in received if f.kind is TelemetryKind.GPS_HEALTH]
|
|
assert len(gps_frames) == 1
|
|
assert gps_frames[0].payload.status is GpsStatus.SPOOFED
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-4: AP HEARTBEAT -> FlightState mapping
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"system_status, base_mode, expected_state",
|
|
[
|
|
(0, 0, FlightState.INIT), # UNINIT
|
|
(1, 0, FlightState.INIT), # BOOT
|
|
(2, 0, FlightState.INIT), # CALIBRATING
|
|
(3, 0x00, FlightState.ON_GROUND), # STANDBY, disarmed
|
|
(3, 0x80, FlightState.ARMED), # STANDBY, armed
|
|
(4, 0x80, FlightState.IN_FLIGHT), # ACTIVE
|
|
(5, 0, FlightState.FAILED), # CRITICAL
|
|
(6, 0, FlightState.FAILED), # EMERGENCY
|
|
(7, 0, FlightState.ON_GROUND), # POWEROFF
|
|
(8, 0, FlightState.FAILED), # FLIGHT_TERMINATION
|
|
],
|
|
)
|
|
def test_ac4_heartbeat_to_flight_state(
|
|
system_status: int, base_mode: int, expected_state: FlightState
|
|
) -> None:
|
|
# Arrange
|
|
bus = SubscriptionBus()
|
|
received: list[FcTelemetryFrame] = []
|
|
bus.subscribe(lambda f: received.append(f))
|
|
decoder = _make_decoder(bus)
|
|
msg = _ap_msg(
|
|
"HEARTBEAT",
|
|
type=0,
|
|
autopilot=3,
|
|
base_mode=base_mode,
|
|
custom_mode=0,
|
|
system_status=system_status,
|
|
mavlink_version=2,
|
|
)
|
|
|
|
# Act
|
|
decoder.feed_one_message(msg)
|
|
|
|
# Assert
|
|
assert received[0].kind is TelemetryKind.MAV_STATE
|
|
payload = received[0].payload
|
|
assert isinstance(payload, FlightStateSignal)
|
|
assert payload.state is expected_state
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-5: iNav MSP2 decode (same DTOs as AP)
|
|
|
|
|
|
class _FakeMspSource:
|
|
def __init__(
|
|
self,
|
|
imu: dict | None = None,
|
|
attitude: dict | None = None,
|
|
gps: dict | None = None,
|
|
state: dict | None = None,
|
|
) -> None:
|
|
self._imu = imu
|
|
self._attitude = attitude
|
|
self._gps = gps
|
|
self._state = state
|
|
|
|
def read_imu(self) -> dict | None:
|
|
return self._imu
|
|
|
|
def read_attitude(self) -> dict | None:
|
|
return self._attitude
|
|
|
|
def read_gps(self) -> dict | None:
|
|
return self._gps
|
|
|
|
def read_flight_state(self) -> dict | None:
|
|
return self._state
|
|
|
|
|
|
def test_ac5_inav_msp2_imu_decode() -> None:
|
|
# Arrange
|
|
bus = SubscriptionBus()
|
|
received: list[FcTelemetryFrame] = []
|
|
bus.subscribe(lambda f: received.append(f))
|
|
source = _FakeMspSource(
|
|
imu={"accelerometer": [11, 22, -33], "gyroscope": [1, 2, 3]},
|
|
)
|
|
decoder = Msp2InavInboundDecoder(source, bus)
|
|
|
|
# Act
|
|
decoder.feed_one_tick()
|
|
|
|
# Assert
|
|
imu_frames = [f for f in received if f.kind is TelemetryKind.IMU_SAMPLE]
|
|
assert len(imu_frames) == 1
|
|
payload = imu_frames[0].payload
|
|
assert isinstance(payload, ImuTelemetrySample)
|
|
assert payload.accel_xyz == (11.0, 22.0, -33.0)
|
|
assert payload.gyro_xyz == (1.0, 2.0, 3.0)
|
|
# iNav is unsigned per RESTRICT-COMM-2
|
|
assert imu_frames[0].signed is False
|
|
|
|
|
|
def test_ac5_inav_msp2_attitude_decode() -> None:
|
|
# Arrange
|
|
bus = SubscriptionBus()
|
|
received: list[FcTelemetryFrame] = []
|
|
bus.subscribe(lambda f: received.append(f))
|
|
source = _FakeMspSource(attitude={"angx": 180.0, "angy": 0.0, "heading": 0.0})
|
|
decoder = Msp2InavInboundDecoder(source, bus)
|
|
|
|
# Act
|
|
decoder.feed_one_tick()
|
|
|
|
# Assert
|
|
att = [f for f in received if f.kind is TelemetryKind.ATTITUDE]
|
|
assert len(att) == 1
|
|
payload = att[0].payload
|
|
assert isinstance(payload, AttitudeSample)
|
|
# 180 deg -> pi rad
|
|
assert payload.roll_rad == pytest.approx(3.141592653589793)
|
|
|
|
|
|
def test_ac5_inav_spoofed_status_unreachable() -> None:
|
|
# Arrange
|
|
bus = SubscriptionBus()
|
|
received: list[FcTelemetryFrame] = []
|
|
bus.subscribe(lambda f: received.append(f))
|
|
source = _FakeMspSource(gps={"fix": 2, "lat": 505000000, "lon": 300000000, "alt": 100.0})
|
|
decoder = Msp2InavInboundDecoder(source, bus)
|
|
|
|
# Act
|
|
decoder.feed_one_tick()
|
|
|
|
# Assert — iNav with valid fix is STABLE; SPOOFED is unreachable
|
|
gps = [f for f in received if f.kind is TelemetryKind.GPS_HEALTH]
|
|
assert len(gps) == 1
|
|
assert gps[0].payload.status is GpsStatus.STABLE
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-6: Bounded ring drop-oldest
|
|
|
|
|
|
def test_ac6_ring_drop_oldest_and_logs_first_overflow(
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
# Arrange
|
|
ring: TelemetryRing[int] = TelemetryRing(capacity=100, kind_name="imu")
|
|
caplog.set_level(logging.INFO, logger="c8_fc_adapter.telemetry_ring")
|
|
|
|
# Act
|
|
for i in range(1000):
|
|
ring.push(i)
|
|
|
|
# Assert — ring contains the latest 100
|
|
items = ring.snapshot()
|
|
assert len(items) == 100
|
|
assert items == list(range(900, 1000))
|
|
# Dropped 900 items (the first 900 entered before any drops; once full,
|
|
# each subsequent push drops one).
|
|
assert ring.dropped_count == 900
|
|
# Exactly ONE overflow record (first overflow), not 900 of them.
|
|
overflow_logs = [
|
|
r for r in caplog.records if getattr(r, "kind", None) == "c8.inbound.ring_overflow"
|
|
]
|
|
assert len(overflow_logs) == 1
|
|
assert overflow_logs[0].kv["ring_kind"] == "imu"
|
|
|
|
|
|
def test_ac6_ring_capacity_zero_rejected() -> None:
|
|
# Act + Assert
|
|
with pytest.raises(ValueError, match=r"capacity must be > 0"):
|
|
TelemetryRing(capacity=0, kind_name="imu")
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-7: Multi-subscriber fan-out
|
|
|
|
|
|
def test_ac7_multi_subscriber_fan_out_and_cancel() -> None:
|
|
# Arrange
|
|
bus = SubscriptionBus()
|
|
counters = [0, 0, 0]
|
|
|
|
def make_cb(idx: int):
|
|
def _cb(_f: FcTelemetryFrame) -> None:
|
|
counters[idx] += 1
|
|
|
|
return _cb
|
|
|
|
handles = [bus.subscribe(make_cb(i)) for i in range(3)]
|
|
frame = _make_frame(TelemetryKind.IMU_SAMPLE)
|
|
|
|
# Act 1 — all 3 receive
|
|
bus.dispatch(frame)
|
|
|
|
# Assert 1
|
|
assert counters == [1, 1, 1]
|
|
|
|
# Act 2 — cancel one, dispatch again
|
|
handles[1].cancel()
|
|
bus.dispatch(frame)
|
|
|
|
# Assert 2
|
|
assert counters == [2, 1, 2]
|
|
|
|
|
|
def test_ac7_subscriber_exception_does_not_kill_dispatch(
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
# Arrange
|
|
bus = SubscriptionBus()
|
|
received: list[FcTelemetryFrame] = []
|
|
bus.subscribe(lambda _f: (_ for _ in ()).throw(RuntimeError("boom")))
|
|
bus.subscribe(lambda f: received.append(f))
|
|
caplog.set_level(logging.DEBUG, logger="c8_fc_adapter.subscription_bus")
|
|
frame = _make_frame(TelemetryKind.IMU_SAMPLE)
|
|
|
|
# Act
|
|
bus.dispatch(frame)
|
|
|
|
# Assert — second subscriber STILL fired
|
|
assert len(received) == 1
|
|
# And the bad subscriber's exception was DEBUG-logged
|
|
err_logs = [
|
|
r for r in caplog.records if getattr(r, "kind", None) == "c8.inbound.callback_error"
|
|
]
|
|
assert len(err_logs) == 1
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-8: Warm-start hint surfaces within 1 s of first GPS_RAW_INT
|
|
|
|
|
|
def test_ac8_warm_start_hint_present_after_first_gps() -> None:
|
|
# Arrange
|
|
bus = SubscriptionBus()
|
|
decoder = _make_decoder(bus)
|
|
# Before any GPS: hint is None
|
|
assert decoder.warm_start_hint is None
|
|
|
|
# Act
|
|
msg = _ap_msg(
|
|
"GPS_RAW_INT",
|
|
time_usec=0,
|
|
fix_type=3,
|
|
lat=505000000,
|
|
lon=300000000,
|
|
alt=125000, # 50.5 / 30.0 / 125 m
|
|
eph=0,
|
|
epv=0,
|
|
vel=0,
|
|
cog=0,
|
|
satellites_visible=8,
|
|
alt_ellipsoid=0,
|
|
h_acc=0,
|
|
v_acc=0,
|
|
vel_acc=0,
|
|
hdg_acc=0,
|
|
yaw=0,
|
|
)
|
|
decoder.feed_one_message(msg)
|
|
|
|
# Assert
|
|
hint = decoder.warm_start_hint
|
|
assert hint is not None
|
|
assert hint.lat_deg == pytest.approx(50.5)
|
|
assert hint.lon_deg == pytest.approx(30.0)
|
|
assert hint.alt_m == pytest.approx(125.0)
|
|
|
|
|
|
def test_ac8_warm_start_hint_propagates_to_flight_state_signal() -> None:
|
|
# Arrange
|
|
bus = SubscriptionBus()
|
|
received: list[FcTelemetryFrame] = []
|
|
bus.subscribe(lambda f: received.append(f))
|
|
decoder = _make_decoder(bus)
|
|
gps = _ap_msg(
|
|
"GPS_RAW_INT",
|
|
time_usec=0,
|
|
fix_type=3,
|
|
lat=505000000,
|
|
lon=300000000,
|
|
alt=125000,
|
|
eph=0,
|
|
epv=0,
|
|
vel=0,
|
|
cog=0,
|
|
satellites_visible=8,
|
|
alt_ellipsoid=0,
|
|
h_acc=0,
|
|
v_acc=0,
|
|
vel_acc=0,
|
|
hdg_acc=0,
|
|
yaw=0,
|
|
)
|
|
heartbeat = _ap_msg(
|
|
"HEARTBEAT",
|
|
type=0,
|
|
autopilot=3,
|
|
base_mode=0,
|
|
custom_mode=0,
|
|
system_status=3,
|
|
mavlink_version=2,
|
|
)
|
|
|
|
# Act
|
|
decoder.feed_one_message(gps)
|
|
decoder.feed_one_message(heartbeat)
|
|
|
|
# Assert
|
|
state_frames = [f for f in received if f.kind is TelemetryKind.MAV_STATE]
|
|
assert len(state_frames) == 1
|
|
payload = state_frames[0].payload
|
|
assert payload.last_valid_gps_hint_wgs84 is not None
|
|
assert payload.last_valid_gps_age_ms is not None
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-9: Out-of-order drop + WARN
|
|
|
|
|
|
def test_ac9_out_of_order_dropped_with_warn(caplog: pytest.LogCaptureFixture) -> None:
|
|
# Arrange
|
|
bus = SubscriptionBus()
|
|
received: list[FcTelemetryFrame] = []
|
|
bus.subscribe(lambda f: received.append(f))
|
|
decoder = _make_decoder(bus)
|
|
caplog.set_level(logging.WARNING, logger="c8_fc_adapter.inbound_mavlink")
|
|
# First message
|
|
msg1 = _ap_msg(
|
|
"ATTITUDE",
|
|
time_boot_ms=1000,
|
|
roll=0.0,
|
|
pitch=0.0,
|
|
yaw=0.0,
|
|
rollspeed=0.0,
|
|
pitchspeed=0.0,
|
|
yawspeed=0.0,
|
|
)
|
|
decoder.feed_one_message(msg1)
|
|
# Capture the first frame's received_at; we'll force the next frame to be
|
|
# apparently older by injecting a manipulated `_last_ts_ns` entry.
|
|
# Force the decoder's per-kind last-ts way into the future so any next
|
|
# frame is "before" it.
|
|
decoder._last_ts_ns[TelemetryKind.ATTITUDE] = time.monotonic_ns() + 10_000_000_000
|
|
|
|
# Act
|
|
msg2 = _ap_msg(
|
|
"ATTITUDE",
|
|
time_boot_ms=2000,
|
|
roll=0.1,
|
|
pitch=0.1,
|
|
yaw=0.1,
|
|
rollspeed=0.0,
|
|
pitchspeed=0.0,
|
|
yawspeed=0.0,
|
|
)
|
|
decoder.feed_one_message(msg2)
|
|
|
|
# Assert
|
|
att_frames = [f for f in received if f.kind is TelemetryKind.ATTITUDE]
|
|
assert len(att_frames) == 1 # second frame dropped
|
|
warn_logs = [
|
|
r
|
|
for r in caplog.records
|
|
if getattr(r, "kind", None) == "c8.inbound.out_of_order_frame_dropped"
|
|
]
|
|
assert len(warn_logs) == 1
|
|
assert warn_logs[0].kv["telemetry_kind"] == "ATTITUDE"
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-10: Decode-error isolation
|
|
|
|
|
|
def test_ac10_corrupt_frame_does_not_kill_decoder(
|
|
caplog: pytest.LogCaptureFixture,
|
|
) -> None:
|
|
# Arrange
|
|
bus = SubscriptionBus()
|
|
received: list[FcTelemetryFrame] = []
|
|
bus.subscribe(lambda f: received.append(f))
|
|
decoder = _make_decoder(bus)
|
|
caplog.set_level(logging.DEBUG, logger="c8_fc_adapter.inbound_mavlink")
|
|
# Corrupt RAW_IMU — `xacc` field missing (AttributeError) on access
|
|
corrupt = SimpleNamespace()
|
|
corrupt.get_type = lambda: "RAW_IMU"
|
|
# Good RAW_IMU immediately after
|
|
good = _ap_msg(
|
|
"RAW_IMU",
|
|
time_usec=0,
|
|
xacc=1,
|
|
yacc=2,
|
|
zacc=3,
|
|
xgyro=4,
|
|
ygyro=5,
|
|
zgyro=6,
|
|
)
|
|
|
|
# Act
|
|
decoder.feed_one_message(corrupt)
|
|
decoder.feed_one_message(good)
|
|
|
|
# Assert — bad frame logged, good frame dispatched
|
|
err_logs = [r for r in caplog.records if getattr(r, "kind", None) == "c8.inbound.decode_error"]
|
|
assert len(err_logs) == 1
|
|
imu_frames = [f for f in received if f.kind is TelemetryKind.IMU_SAMPLE]
|
|
assert len(imu_frames) == 1
|
|
# AZ-918: raw MAVLink (xacc=1, yacc=2, zacc=3) mG FRD → SI/FLU.
|
|
assert imu_frames[0].payload.accel_xyz == pytest.approx((
|
|
1 * 9.80665e-3,
|
|
-2 * 9.80665e-3,
|
|
-3 * 9.80665e-3,
|
|
))
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# NFR: IMU callback latency under 1 ms (loose budget for unit test)
|
|
|
|
|
|
def test_nfr_perf_imu_callback_under_1ms() -> None:
|
|
# Arrange
|
|
bus = SubscriptionBus()
|
|
bus.subscribe(lambda _f: None)
|
|
decoder = _make_decoder(bus)
|
|
|
|
iters = 200
|
|
msgs = [
|
|
_ap_msg(
|
|
"RAW_IMU",
|
|
time_usec=i,
|
|
xacc=i,
|
|
yacc=i,
|
|
zacc=i,
|
|
xgyro=i,
|
|
ygyro=i,
|
|
zgyro=i,
|
|
)
|
|
for i in range(iters)
|
|
]
|
|
|
|
# Act
|
|
start = time.perf_counter()
|
|
for m in msgs:
|
|
decoder.feed_one_message(m)
|
|
avg_s = (time.perf_counter() - start) / iters
|
|
|
|
# Assert
|
|
assert avg_s < 1e-3, f"avg {avg_s * 1e3:.3f}ms > 1ms budget"
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# NFR: Thread-safety smoke (single-producer / single-consumer over a ring)
|
|
|
|
|
|
def test_nfr_ring_thread_safety_smoke() -> None:
|
|
# Arrange
|
|
ring: TelemetryRing[int] = TelemetryRing(capacity=1024, kind_name="imu")
|
|
n = 5000
|
|
|
|
def producer() -> None:
|
|
for i in range(n):
|
|
ring.push(i)
|
|
|
|
def consumer_snapshots() -> None:
|
|
for _ in range(n):
|
|
ring.snapshot()
|
|
|
|
# Act
|
|
threads = [
|
|
threading.Thread(target=producer),
|
|
threading.Thread(target=consumer_snapshots),
|
|
]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
# Assert — no crash; ring still has bounded size
|
|
assert len(ring) <= 1024
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Helpers (factories for test frames)
|
|
|
|
|
|
def _make_frame(kind: TelemetryKind) -> FcTelemetryFrame:
|
|
if kind is TelemetryKind.IMU_SAMPLE:
|
|
payload: Any = ImuTelemetrySample(
|
|
ts_ns=0, accel_xyz=(0.0, 0.0, 0.0), gyro_xyz=(0.0, 0.0, 0.0)
|
|
)
|
|
elif kind is TelemetryKind.ATTITUDE:
|
|
payload = AttitudeSample(ts_ns=0, roll_rad=0.0, pitch_rad=0.0, yaw_rad=0.0)
|
|
elif kind is TelemetryKind.GPS_HEALTH:
|
|
payload = GpsHealth(status=GpsStatus.STABLE, fix_age_ms=0, captured_at=0)
|
|
else:
|
|
payload = FlightStateSignal(
|
|
state=FlightState.INIT,
|
|
last_valid_gps_hint_wgs84=None,
|
|
last_valid_gps_age_ms=None,
|
|
captured_at=0,
|
|
)
|
|
return FcTelemetryFrame(kind=kind, payload=payload, received_at=0, signed=False)
|