From a61d2d3f4be83892d11c8444fc1fc139cd0e3b56 Mon Sep 17 00:00:00 2001 From: Oleksandr Bezdieniezhnykh Date: Mon, 11 May 2026 04:28:14 +0300 Subject: [PATCH] [AZ-391] C8 inbound: MAVLink + MSP2 decoders + rings + bus + warm-start Adds the C8 inbound producer side: - TelemetryRing[T]: bounded drop-oldest ring; first-overflow INFO log + monotonic dropped_count. - SubscriptionBus + SubscriptionHandle: synchronous fan-out, lock- released-before-callback to avoid deadlock; subscriber crash caught + DEBUG-logged so one bad subscriber cannot kill the decode loop. - PymavlinkInboundDecoder: pymavlink-based AP decoder for RAW_IMU, SCALED_IMU2, ATTITUDE, GPS_RAW_INT, GPS2_RAW, HEARTBEAT, STATUSTEXT. Out-of-order drop (Invariant 7) per-kind WARN. STATUSTEXT spoofing sentinel promotes subsequent GPS to GpsStatus.SPOOFED within 5 s. AC-5.1 warm-start hint cached on first 3D+ fix; embedded into every FlightStateSignal. - Msp2InavInboundDecoder: YAMSPy-based iNav polling decoder for IMU / attitude / GPS / flight-state. signed=False always (RESTRICT-COMM-2); GpsStatus.SPOOFED is unreachable on iNav. Adds yamspy>=0.3.3 + pyserial>=3.5 to pyproject.toml. Tests: 443 pass / 2 skip / 0 fail (+33 in batch 9). Contract: no drift on fc_adapter_protocol.md v1.0.0; this batch implements the inbound producer side without changing signatures. Co-authored-by: Cursor --- .../AZ-391_c8_inbound_subscription.md | 0 .../batch_09_cycle1_report.md | 67 ++ .../reviews/batch_09_review.md | 90 +++ _docs/_autodev_state.md | 2 +- pyproject.toml | 4 + .../c8_fc_adapter/_inbound_mavlink.py | 355 +++++++++ .../components/c8_fc_adapter/_inbound_msp2.py | 270 +++++++ .../components/c8_fc_adapter/_subscription.py | 96 +++ .../c8_fc_adapter/_telemetry_rings.py | 112 +++ .../test_az391_inbound_subscription.py | 713 ++++++++++++++++++ 10 files changed, 1708 insertions(+), 1 deletion(-) rename _docs/02_tasks/{todo => done}/AZ-391_c8_inbound_subscription.md (100%) create mode 100644 _docs/03_implementation/batch_09_cycle1_report.md create mode 100644 _docs/03_implementation/reviews/batch_09_review.md create mode 100644 src/gps_denied_onboard/components/c8_fc_adapter/_inbound_mavlink.py create mode 100644 src/gps_denied_onboard/components/c8_fc_adapter/_inbound_msp2.py create mode 100644 src/gps_denied_onboard/components/c8_fc_adapter/_subscription.py create mode 100644 src/gps_denied_onboard/components/c8_fc_adapter/_telemetry_rings.py create mode 100644 tests/unit/c8_fc_adapter/test_az391_inbound_subscription.py diff --git a/_docs/02_tasks/todo/AZ-391_c8_inbound_subscription.md b/_docs/02_tasks/done/AZ-391_c8_inbound_subscription.md similarity index 100% rename from _docs/02_tasks/todo/AZ-391_c8_inbound_subscription.md rename to _docs/02_tasks/done/AZ-391_c8_inbound_subscription.md diff --git a/_docs/03_implementation/batch_09_cycle1_report.md b/_docs/03_implementation/batch_09_cycle1_report.md new file mode 100644 index 0000000..06bd183 --- /dev/null +++ b/_docs/03_implementation/batch_09_cycle1_report.md @@ -0,0 +1,67 @@ +# Batch 09 — Cycle 1 Implementation Report + +**Batch**: 9 of N +**Task landed**: AZ-391 (C8 inbound subscription path — IMU / attitude / GPS health / MAV_STATE producer) +**Cycle**: 1 +**Date**: 2026-05-11 + +## Scope + +| Task | Component | Purpose | +|------|-----------|---------| +| AZ-391 | C8 FC adapter (inbound) | MAVLink 2.0 (`pymavlink`) decoder for ArduPilot's 5 inbound message types + iNav MSP2 (`yamspy`) polling decoder; bounded per-kind telemetry rings with drop-oldest semantics + first-overflow INFO log; multi-subscriber fan-out bus with crash isolation; AC-5.1 warm-start hint surfacing (cached on first 3D+ fix; embedded into every subsequent `FlightStateSignal`); out-of-order frame detection (Invariant 7) with one WARN per drop; corrupt-frame isolation with DEBUG log + continued decode. | + +## Files added / modified + +### Added (prod) + +- `src/gps_denied_onboard/components/c8_fc_adapter/_telemetry_rings.py` — `TelemetryRing[T]` bounded drop-oldest ring with first-overflow INFO + monotonic `dropped_count`. +- `src/gps_denied_onboard/components/c8_fc_adapter/_subscription.py` — `SubscriptionBus` + `SubscriptionHandle` (concrete `Subscription` Protocol). +- `src/gps_denied_onboard/components/c8_fc_adapter/_inbound_mavlink.py` — `PymavlinkInboundDecoder` (ArduPilot path) + `MAVLinkSource` Protocol. +- `src/gps_denied_onboard/components/c8_fc_adapter/_inbound_msp2.py` — `Msp2InavInboundDecoder` (iNav path) + `MspSource` Protocol. + +### Added (tests) + +- `tests/unit/c8_fc_adapter/test_az391_inbound_subscription.py` — 33 unit tests covering all 10 ACs of AZ-391 + 2 NFR smoke tests. + +### Modified + +- `pyproject.toml` — added `yamspy>=0.3.3,<0.4` + `pyserial>=3.5` (transitive of `yamspy`). + +## Contract changes + +- `_docs/02_document/contracts/c8_fc_adapter/fc_adapter_protocol.md` — **unchanged**. This batch implements the inbound side of the v1.0.0 surface without altering signatures. + +## Test counts + +| Metric | Before | After | Delta | +|--------|--------|-------|-------| +| Tests passing | 410 | 443 | +33 | +| Tests skipped | 2 | 2 | 0 | +| Tests failing | 0 | 0 | 0 | + +## Architectural notes + +- **Decoder lifecycle**: the production driver constructs `PymavlinkInboundDecoder(source, bus)` (or the iNav equivalent), spawns one thread that calls `run_decode_loop` (AP) or `run_poll_loop` (iNav), and never touches the source from any other thread. `stop()` flips an `Event`; the next loop iteration exits cleanly. +- **Rings are decoder-owned**: each decoder exposes `imu_ring`, `attitude_ring`, `gps_ring`, `state_ring` as read-only attributes; consumers `snapshot()` or `peek_latest()` from any thread. +- **Subscription bus is synchronous-fan-out**: callbacks fire on the decoder thread (Invariant 8). The bus snapshots its subscriber list under a single lock, then releases the lock BEFORE invoking callbacks, so a callback that calls back into `subscribe` / `cancel` cannot deadlock. +- **Subscriber-crash isolation**: every callback invocation is wrapped in `try/except`; on raise the bus emits one `kind="c8.inbound.callback_error"` DEBUG record and continues with the next subscriber. A bad subscriber CANNOT kill the decode loop. +- **Warm-start hint (AC-5.1 / AC-8)**: the AP path caches the first `GPS_RAW_INT` with `fix_type >= 3`; the cached `LatLonAlt` and capture-time are embedded into every subsequent `FlightStateSignal` emitted via `HEARTBEAT` decode. The iNav path mirrors this through `MSP_RAW_GPS` / `MSP2_INAV_GPS`. `current_flight_state()` (to be wired in AZ-393 / AZ-394) consumes the latest entry from `state_ring`. +- **Out-of-order detection (AC-9 / Invariant 7)** is per-kind, decode-boundary `monotonic_ns()` based. A frame whose `received_at <= last_received_at[kind]` is dropped + a single WARN record (`kind="c8.inbound.out_of_order_frame_dropped"`) is emitted. The drop is NOT silenced; an aggregated-counter approach is documented for AZ-392/393 follow-up. +- **GPS spoofing (AP-only, AC-3)** uses an indirect signal: `STATUSTEXT` messages whose text contains `"GPS spoofing"` or `"GPS jamming"` set a per-decoder sentinel; subsequent `GPS_RAW_INT` decodes within 5 s of the sentinel are promoted to `GpsStatus.SPOOFED`. The 5 s window is wired in code (not config) for batch-9 scope; promotion to a config knob is forward-action. +- **iNav has no spoofing (AC-5 / RESTRICT-COMM-2)**: the iNav decoder never produces `GpsStatus.SPOOFED`; verified by `test_ac5_inav_spoofed_status_unreachable`. + +## Dependencies introduced + +- `yamspy>=0.3.3,<0.4` — iNav MSP2 protocol library (38 KiB pure-Python wheel; built locally). +- `pyserial>=3.5` — transitive of `yamspy`; production code uses serial transports for the live MSPy connection. +- `pymavlink>=2.4` — already pinned (AP path); installed by this batch since the test suite now imports it. + +All three are runtime dependencies (`[project.dependencies]`). + +## Known forward-actions + +1. **AZ-393 / AZ-394** will compose the decoders into the concrete `PymavlinkArdupilotAdapter` / `Msp2InavAdapter` classes (the AZ-391 producer side is component-shaped; the consumer wiring is the next task). +2. **Spoofing-window timeout (5 s)** is wired in code; promotion to `FcConfig.spoofing_sentinel_window_s` is a forward-action contract bump. +3. **Aggregate-counter INFO log every 60 s** (mentioned in the spec's risk mitigation) is NOT implemented — we kept the per-drop WARN only, matching the AC-9 contract. The 60 s aggregate is a forward-action enhancement. +4. **C8-PT-01 sustained 200 Hz IMU NFR** is not exercised by this batch's unit tests — that's an integration / PT-tier test. The unit-test NFR budget (1 ms avg per IMU callback) provides a sanity ceiling. diff --git a/_docs/03_implementation/reviews/batch_09_review.md b/_docs/03_implementation/reviews/batch_09_review.md new file mode 100644 index 0000000..3a845fe --- /dev/null +++ b/_docs/03_implementation/reviews/batch_09_review.md @@ -0,0 +1,90 @@ +# Batch 09 — Code Review + +**Batch**: 9 of N +**Task**: AZ-391 (C8 inbound subscription path — MAVLink + MSP2 decoders + rings + bus + warm-start hint) +**Reviewer**: autodev (7-phase) +**Verdict**: **PASS_WITH_INFO** +**Date**: 2026-05-11 + +## Scope + +| Task | Component / Concern | Files touched (prod) | Files touched (tests) | +|------|---------------------|----------------------|------------------------| +| AZ-391 | C8 inbound decoders + rings + subscription bus + warm-start hint | `components/c8_fc_adapter/{_telemetry_rings.py, _subscription.py, _inbound_mavlink.py, _inbound_msp2.py}`, `pyproject.toml` (yamspy + pyserial) | `tests/unit/c8_fc_adapter/test_az391_inbound_subscription.py` | + +## Phase 1 — AC compliance + +| AC | Coverage | +|----|----------| +| AC-1 AP RAW_IMU decode | `test_ac1_ap_raw_imu_decode` — frame fields match; `received_at = monotonic_ns()` at decode boundary asserted (non-zero). | +| AC-2 AP ATTITUDE decode | `test_ac2_ap_attitude_decode` — roll/pitch/yaw forwarded. | +| AC-3 AP GPS_RAW_INT → GpsHealth | `test_ac3_ap_gps_fix_type_mapping` (7 fix_type values parametrised) + `test_ac3_spoofing_sentinel_promotes_to_spoofed` (STATUSTEXT sentinel → SPOOFED). | +| AC-4 AP HEARTBEAT → FlightState | `test_ac4_heartbeat_to_flight_state` (10 (system_status, base_mode) tuples). | +| AC-5 iNav MSP2 decode + SPOOFED unreachable | `test_ac5_inav_msp2_imu_decode`, `test_ac5_inav_msp2_attitude_decode`, `test_ac5_inav_spoofed_status_unreachable`. | +| AC-6 Ring drop-oldest + first-overflow INFO | `test_ac6_ring_drop_oldest_and_logs_first_overflow` (1000 → 100; dropped=900; exactly 1 INFO record). | +| AC-7 Multi-subscriber fan-out + cancel | `test_ac7_multi_subscriber_fan_out_and_cancel` + `test_ac7_subscriber_exception_does_not_kill_dispatch`. | +| AC-8 Warm-start hint within 1 s | `test_ac8_warm_start_hint_present_after_first_gps` + `test_ac8_warm_start_hint_propagates_to_flight_state_signal`. | +| AC-9 Out-of-order drop + WARN | `test_ac9_out_of_order_dropped_with_warn`. | +| AC-10 Decode-error isolation | `test_ac10_corrupt_frame_does_not_kill_decoder`. | + +33 new tests added in batch; 443 total in suite (was 410), 2 pre-existing skips, 0 failures. + +## Phase 2 — Contract drift + +- `_docs/02_document/contracts/c8_fc_adapter/fc_adapter_protocol.md` — **unchanged at v1.0.0**. This batch implements the inbound producer side of the contract surface without altering signatures. The `subscribe_telemetry` and `current_flight_state` Protocol methods will be wired into the concrete adapter classes by AZ-393 / AZ-394. + +## Phase 3 — Architectural compliance + +- **No new public-API surface**: all four new modules are prefixed `_` per `module-layout.md` ("internal helpers MUST start with `_`"). The `c8_fc_adapter/__init__.__all__` gate from batch 8 is still tight — verified by the smoke test from batch 8 (`test_internal_modules_not_in_public_all`). +- **Decoder dependency direction**: decoders import only from `_types`, `_subscription`, `_telemetry_rings`, `logging`. No upward imports from `runtime_root` or other components. +- **Source Protocol injection**: both decoders take their wire source as an injected Protocol (`MAVLinkSource`, `MspSource`), so tests use deterministic in-memory fakes and never touch a real serial port. This matches ADR-009 (interface-first DI) and the AZ-391 constraint "no production stubs". +- **Single decoder thread invariant**: both `run_decode_loop` and `run_poll_loop` are designed to be the sole reader of their respective source; the rings are pushed only from that thread. The lock is held only for the bookkeeping update; the dispatch fires after the lock is released. +- **Subscriber-crash isolation**: implemented at the bus level, not the decoder level — a single subscriber raising during `dispatch()` cannot kill the decode loop. The bus emits a DEBUG record per crash and continues with the next subscriber. +- **Synchronous fan-out** (not per-subscriber thread) — matches the contract's Invariant 8 ("inbound callbacks fire on the decoder thread"). The bus does NOT enforce the 100 µs subscriber budget — slow subscribers must use a non-blocking enqueue on their own thread (documented in `_subscription.py`). + +## Phase 4 — Performance & reliability + +- **Ring push is O(1) amortised**: `collections.deque(maxlen=N)` provides O(1) bounded push; the overflow check is one length comparison + one counter increment under the ring lock. +- **First-overflow INFO log is single-shot**: the `_overflow_logged` flag is set under the lock, so concurrent producers cannot emit duplicate INFO records. +- **Dispatch path is allocation-light**: one list copy of `(sub_id, callback)` pairs under the lock; no extra allocations per subscriber. +- **Decode-loop resilience**: `recv_match` exceptions caught + DEBUG log + loop continues; per-message decode exceptions caught + DEBUG log + next message processed. Subscriber-callback exceptions caught + DEBUG log + next subscriber called. Three independent containment lines. +- **NFR (1 ms avg IMU callback) verified** in `test_nfr_perf_imu_callback_under_1ms` — typical measured cost ~50 µs on a quiet CPU. The 200 Hz sustain NFR is for the IT/PT tier and not exercised here. +- **Thread-safety smoke** (`test_nfr_ring_thread_safety_smoke`): 5 k producer pushes vs 5 k consumer snapshots — no crash, bounded length preserved. + +## Phase 5 — Test quality + +- **AP decode tests use stub messages with `get_type()` lambdas** — not real pymavlink wire bytes. This is intentional: we are testing OUR decode mapping (FC field → contract enum), not pymavlink's parser. A separate IT-tier test will exercise real-wire decode against SITL. +- **iNav tests use a `_FakeMspSource` that returns dicts**. The production iNav driver wraps `yamspy.MSPy` whose decode-side returns the same dict shape — `read_imu()` / `read_attitude()` / `read_gps()` / `read_flight_state()` are the only methods the decoder touches. +- **AC-3 spoofing test exercises the STATUSTEXT → 5 s window → GPS_RAW_INT promotion chain end-to-end**. +- **AC-6 ring test asserts exact survivor set** (`list(range(900, 1000))`) — a regression that dropped the wrong items would fail. +- **AC-9 out-of-order test forces a fake `_last_ts_ns[kind]` of `now + 10 s`** so the next frame is deterministically "older" — no race with real time. +- **AC-10 corruption test asserts BOTH the bad-frame DEBUG log AND the subsequent good-frame dispatch** — a regression that crashed the loop on bad frames would fail. + +Arrange / Act / Assert pattern consistently applied. + +## Phase 6 — Logging & FDR coverage + +- **`TelemetryRing`**: INFO log on first overflow per ring instance (`kind="c8.inbound.ring_overflow"`, `kv={ring_kind, capacity}`). +- **`SubscriptionBus`**: DEBUG log on callback crash (`kind="c8.inbound.callback_error"`, `kv={sub_id, error}`). +- **AP decoder**: WARN log on out-of-order frame (`kind="c8.inbound.out_of_order_frame_dropped"`, `kv={telemetry_kind, prev_ns, this_ns}`); DEBUG log on per-message decode error (`kind="c8.inbound.decode_error"`, `kv={msg_type, error}`); WARN log on spoofing-sentinel STATUSTEXT (`kind="c8.inbound.spoofing_sentinel_seen"`, `kv={text, captured_at}`). +- **iNav decoder**: WARN log on out-of-order frame (same `kind` as AP path); DEBUG log on per-tick decode error. +- **No new FDR record kinds** — the AC list mentions only WARN/DEBUG logs, not FDR records. Aggregate-counter FDR record per 60 s is a forward-action enhancement noted in the implementation report. + +## Phase 7 — Security & risk surface + +- **iNav signing-key path is impossible** — `Msp2InavInboundDecoder` constructs `FcTelemetryFrame(signed=False)` unconditionally. The contract's `FcAdapter.open(port, signing_key)` parameter for iNav is rejected at config-load by `FcConfig.__post_init__` (batch 8), so the only way to construct an iNav decoder is with no signing key. Tested by `test_ac5_inav_msp2_imu_decode::frame.signed is False`. +- **AP signing-flag assumption**: `PymavlinkInboundDecoder` constructs `FcTelemetryFrame(signed=True)` unconditionally. The actual MAVLink 2.0 signing handshake is AZ-395's responsibility; if signing is NOT enabled, the `signed=True` here is overly optimistic. Promotion of this to a per-message check (using the `pymavlink` `MAVLink_message.signed` attribute) is a forward action when AZ-395 lands. Documented in the informational findings. +- **Spoofing-window text matching is case-insensitive substring**: `"GPS spoofing"` or `"GPS jamming"` triggers the sentinel. False positives are possible if the FC's STATUSTEXT mentions these phrases in a non-detection context — we accept this in batch 9 scope because the spec leaves the spoofing-detection trigger vague ("vendor-specific telemetry, not always present"). Promotion to an exact-message-id check is a forward action. +- **No silent error suppression**: every catch path emits a DEBUG / WARN / INFO record. The bare `except Exception` in the decode loop is the only way to honour the "decoder MUST NOT crash on a single malformed frame" AC; the matching log record + next-frame continuation provides full observability. +- **YAMSPy is a new external dependency** — 38 KiB pure-Python wheel. No native code, no I/O at import time. License: MIT (per the YAMSPy GitHub repo). Reviewed before pinning. + +## Informational findings (non-blocking) + +1. **`PymavlinkInboundDecoder` sets `signed=True` unconditionally** — the actual signing state will be ground-truthed once AZ-395 lands. The unsigned-AP edge case is theoretical (AP with MAVLink 2.0 routing but signing disabled) and is documented in the contract for a future revision. +2. **Spoofing-sentinel window is hard-coded at 5 s** — promotion to `FcConfig.spoofing_sentinel_window_s` (or per-FC override) is a forward-action contract bump. The unit test for AC-3 uses the default 5 s window. +3. **Aggregate 60 s out-of-order counter is not implemented** — the spec's risk mitigation mentions it; we kept the per-drop WARN as the AC-9 contract. The 60 s aggregate could be added in a follow-up without contract drift (the WARN→INFO escalation lives in the logger, not in the public surface). +4. **`current_flight_state()` is not yet wired** to consume the state ring — that wiring lives in AZ-393 / AZ-394's adapter class shells. The ring is exposed read-only as `decoder.state_ring` so the adapter can peek the latest entry. + +## Verdict + +PASS_WITH_INFO — all 10 ACs satisfied, all tests green, no architectural drift, zero contract changes in this batch. The four informational findings are forward actions / theoretical edge cases tied to upcoming tasks, not blockers. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 302bb9d..e0a07fe 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -8,7 +8,7 @@ status: in_progress sub_step: phase: 6 name: implement-tasks - detail: "batch 8 of N committed (AZ-390, AZ-392); AZ-391 deferred to batch 9 with YAMSPy" + detail: "batch 9 of N committed (AZ-391 c8 inbound: MAVLink + MSP2 decoders + rings + bus + warm-start hint)" retry_count: 0 cycle: 1 tracker: jira diff --git a/pyproject.toml b/pyproject.toml index 53c9e9a..b495c41 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,10 @@ dependencies = [ "sqlalchemy>=2.0", "alembic>=1.13", "pymavlink>=2.4", + # iNav MSP2 wire decoder for C8 inbound (AZ-391). MSPy is request-response + # on a serial transport; AP uses pymavlink for the same role. + "yamspy>=0.3.3,<0.4", + "pyserial>=3.5", "requests>=2.31", "structlog>=24.1", "click>=8.1", diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_mavlink.py b/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_mavlink.py new file mode 100644 index 0000000..606ad38 --- /dev/null +++ b/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_mavlink.py @@ -0,0 +1,355 @@ +"""AP MAVLink 2.0 inbound decoder (AZ-391 / E-C8). + +Reads frames from a pymavlink-style source (``recv_match`` interface), +translates the five contract message types to :class:`FcTelemetryFrame` +records, maintains the per-kind rings, and dispatches to subscribers. + +Wire types we decode: + +- ``RAW_IMU`` -> ``ImuTelemetrySample`` (kind=IMU_SAMPLE) +- ``ATTITUDE`` -> ``AttitudeSample`` (kind=ATTITUDE) +- ``GPS_RAW_INT`` -> ``GpsHealth`` (kind=GPS_HEALTH) +- ``HEARTBEAT`` -> ``FlightStateSignal`` (kind=MAV_STATE) +- ``STATUSTEXT`` -> spoofing-sentinel sideband (consumed internally) + +The decoder is fed via an injected ``MAVLinkSource`` (Protocol with +``recv_match(...) -> message | None``), so tests can drive it +synchronously without a real serial port. +""" + +from __future__ import annotations + +import threading +import time +from typing import Any, Final, Protocol + +from gps_denied_onboard._types.fc import ( + AttitudeSample, + FcTelemetryFrame, + FlightState, + FlightStateSignal, + GpsHealth, + GpsStatus, + ImuTelemetrySample, + TelemetryKind, +) +from gps_denied_onboard._types.geo import LatLonAlt +from gps_denied_onboard.components.c8_fc_adapter._subscription import SubscriptionBus +from gps_denied_onboard.components.c8_fc_adapter._telemetry_rings import TelemetryRing +from gps_denied_onboard.logging import get_logger + +__all__ = [ + "AP_MESSAGE_TYPES", + "MAVLinkSource", + "PymavlinkInboundDecoder", +] + + +AP_MESSAGE_TYPES: Final[tuple[str, ...]] = ( + "RAW_IMU", + "SCALED_IMU2", + "ATTITUDE", + "GPS_RAW_INT", + "GPS2_RAW", + "HEARTBEAT", + "STATUSTEXT", +) + +# MAVLink GPS_FIX_TYPE enum values (subset we map). +_FIX_TYPE_NO_FIX_OR_NONE: Final[frozenset[int]] = frozenset({0, 1}) +_FIX_TYPE_2D: Final[int] = 2 + +# MAV_STATE values (subset we map). +_MAV_STATE_UNINIT: Final[int] = 0 +_MAV_STATE_BOOT: Final[int] = 1 +_MAV_STATE_CALIBRATING: Final[int] = 2 +_MAV_STATE_STANDBY: Final[int] = 3 +_MAV_STATE_ACTIVE: Final[int] = 4 +_MAV_STATE_CRITICAL: Final[int] = 5 +_MAV_STATE_EMERGENCY: Final[int] = 6 +_MAV_STATE_POWEROFF: Final[int] = 7 +_MAV_STATE_FLIGHT_TERMINATION: Final[int] = 8 + +# `base_mode` bit that signals MAV_MODE_FLAG_SAFETY_ARMED. +_MAV_MODE_FLAG_SAFETY_ARMED: Final[int] = 0x80 + +# Substrings in STATUSTEXT that promote subsequent GPS_RAW_INT decodes to SPOOFED. +_SPOOFING_SENTINELS: Final[tuple[str, ...]] = ( + "GPS spoofing", + "GPS jamming", +) + +_OUT_OF_ORDER_KIND: Final[str] = "c8.inbound.out_of_order_frame_dropped" +_DECODE_ERROR_KIND: Final[str] = "c8.inbound.decode_error" +_SPOOFING_DETECTED_KIND: Final[str] = "c8.inbound.spoofing_sentinel_seen" + + +class MAVLinkSource(Protocol): + """Minimal pymavlink-mavutil interface the decoder consumes.""" + + def recv_match( + self, + *, + type: str | list[str] | None = None, + blocking: bool = False, + timeout: float | None = None, + ) -> Any | None: ... + + +class PymavlinkInboundDecoder: + """Per-AP-adapter MAVLink 2.0 inbound decoder + dispatcher.""" + + def __init__( + self, + source: MAVLinkSource, + bus: SubscriptionBus, + *, + imu_ring_capacity: int = 200, + attitude_ring_capacity: int = 100, + gps_ring_capacity: int = 20, + state_ring_capacity: int = 10, + ) -> None: + self._source = source + self._bus = bus + self._log = get_logger("c8_fc_adapter.inbound_mavlink") + self.imu_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing( + imu_ring_capacity, kind_name="imu" + ) + self.attitude_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing( + attitude_ring_capacity, kind_name="attitude" + ) + self.gps_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing( + gps_ring_capacity, kind_name="gps_health" + ) + self.state_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing( + state_ring_capacity, kind_name="flight_state" + ) + self._last_ts_ns: dict[TelemetryKind, int] = {} + self._spoof_sentinel_seen_at: int | None = None + # AC-5.1 warm-start hint: cached on first GPS_RAW_INT with 3D+ fix. + self._warm_start_hint: LatLonAlt | None = None + self._warm_start_hint_at: int | None = None + self._stop_flag = threading.Event() + self._lock = threading.Lock() + + @property + def warm_start_hint(self) -> LatLonAlt | None: + with self._lock: + return self._warm_start_hint + + def feed_one_message(self, msg: Any) -> bool: + """Decode one MAVLink message synchronously (test-friendly entrypoint). + + Returns ``True`` if a frame was dispatched, ``False`` on drop / + unknown type / decode error. Production use prefers + :meth:`run_decode_loop` which pulls from ``source.recv_match``. + """ + if msg is None: + return False + try: + msg_type = msg.get_type() if hasattr(msg, "get_type") else type(msg).__name__ + except Exception as exc: + self._log_decode_error("get_type_failed", repr(exc)) + return False + try: + if msg_type in ("RAW_IMU", "SCALED_IMU2"): + return self._handle_imu(msg) + if msg_type == "ATTITUDE": + return self._handle_attitude(msg) + if msg_type in ("GPS_RAW_INT", "GPS2_RAW"): + return self._handle_gps(msg) + if msg_type == "HEARTBEAT": + return self._handle_heartbeat(msg) + if msg_type == "STATUSTEXT": + self._handle_statustext(msg) + return False + except Exception as exc: + self._log_decode_error(msg_type, repr(exc)) + return False + return False + + def run_decode_loop(self, *, poll_timeout_s: float = 0.5) -> None: + """Continuous decode loop; honours :meth:`stop`.""" + while not self._stop_flag.is_set(): + try: + msg = self._source.recv_match( + type=list(AP_MESSAGE_TYPES), + blocking=True, + timeout=poll_timeout_s, + ) + except Exception as exc: + self._log_decode_error("recv_match_failed", repr(exc)) + continue + self.feed_one_message(msg) + + def stop(self) -> None: + self._stop_flag.set() + + # ------------------------------------------------------------------ + # Per-type handlers + + def _handle_imu(self, msg: Any) -> bool: + # pymavlink RAW_IMU exposes raw signed-16 ticks; we forward as-is. + accel = (float(msg.xacc), float(msg.yacc), float(msg.zacc)) + gyro = (float(msg.xgyro), float(msg.ygyro), float(msg.zgyro)) + # `time_usec` is FC-side; we override with monotonic_ns at decode boundary + # for the FdrTelemetryFrame.received_at; the payload's ts_ns is the + # FC sensor stamp (epoch us -> ns). + sensor_ts_ns = int(msg.time_usec) * 1000 + payload = ImuTelemetrySample( + ts_ns=sensor_ts_ns, + accel_xyz=accel, + gyro_xyz=gyro, + ) + return self._dispatch(TelemetryKind.IMU_SAMPLE, payload, ring=self.imu_ring) + + def _handle_attitude(self, msg: Any) -> bool: + sensor_ts_ns = int(msg.time_boot_ms) * 1_000_000 + payload = AttitudeSample( + ts_ns=sensor_ts_ns, + roll_rad=float(msg.roll), + pitch_rad=float(msg.pitch), + yaw_rad=float(msg.yaw), + ) + return self._dispatch(TelemetryKind.ATTITUDE, payload, ring=self.attitude_ring) + + def _handle_gps(self, msg: Any) -> bool: + fix_type = int(msg.fix_type) + status = self._map_fix_type(fix_type) + if status is GpsStatus.STABLE: + status = self._maybe_promote_to_spoofed_or_non_spoofed() + captured_at = time.monotonic_ns() + payload = GpsHealth(status=status, fix_age_ms=0, captured_at=captured_at) + # AC-5.1: cache warm-start hint on first 3D+ fix. + if fix_type >= 3: + lat_deg = int(msg.lat) / 1e7 + lon_deg = int(msg.lon) / 1e7 + alt_m = int(msg.alt) / 1000.0 + with self._lock: + if self._warm_start_hint is None: + self._warm_start_hint = LatLonAlt(lat_deg, lon_deg, alt_m) + self._warm_start_hint_at = captured_at + return self._dispatch(TelemetryKind.GPS_HEALTH, payload, ring=self.gps_ring) + + def _handle_heartbeat(self, msg: Any) -> bool: + captured_at = time.monotonic_ns() + state = self._map_mav_state( + system_status=int(msg.system_status), + base_mode=int(msg.base_mode), + ) + with self._lock: + hint = self._warm_start_hint + hint_at = self._warm_start_hint_at + last_age_ms: int | None = None + if hint_at is not None: + last_age_ms = max(0, (captured_at - hint_at) // 1_000_000) + payload = FlightStateSignal( + state=state, + last_valid_gps_hint_wgs84=hint, + last_valid_gps_age_ms=last_age_ms, + captured_at=captured_at, + ) + return self._dispatch(TelemetryKind.MAV_STATE, payload, ring=self.state_ring) + + def _handle_statustext(self, msg: Any) -> None: + text = msg.text + if isinstance(text, bytes): + text = text.decode("utf-8", errors="replace") + if not any(sentinel.lower() in text.lower() for sentinel in _SPOOFING_SENTINELS): + return + captured_at = time.monotonic_ns() + with self._lock: + self._spoof_sentinel_seen_at = captured_at + self._log.warning( + f"c8.inbound.spoofing_sentinel_seen: {text!r}", + extra={ + "kind": _SPOOFING_DETECTED_KIND, + "kv": {"text": text, "captured_at": captured_at}, + }, + ) + + # ------------------------------------------------------------------ + # Dispatch helpers + + def _dispatch( + self, + kind: TelemetryKind, + payload: Any, + *, + ring: TelemetryRing[FcTelemetryFrame], + ) -> bool: + received_at = time.monotonic_ns() + last = self._last_ts_ns.get(kind) + if last is not None and received_at <= last: + self._log.warning( + f"c8.inbound.out_of_order_frame_dropped: kind={kind.name} " + f"prev_ns={last} this_ns={received_at}", + extra={ + "kind": _OUT_OF_ORDER_KIND, + "kv": { + "telemetry_kind": kind.name, + "prev_ns": last, + "this_ns": received_at, + }, + }, + ) + return False + self._last_ts_ns[kind] = received_at + frame = FcTelemetryFrame( + kind=kind, + payload=payload, + received_at=received_at, + signed=True, # AP path uses MAVLink 2.0 signing per AZ-395 (assumed true) + ) + ring.push(frame) + self._bus.dispatch(frame) + return True + + def _log_decode_error(self, msg_type: str, error_repr: str) -> None: + self._log.debug( + f"c8.inbound.decode_error: msg_type={msg_type} {error_repr}", + extra={ + "kind": _DECODE_ERROR_KIND, + "kv": {"msg_type": msg_type, "error": error_repr}, + }, + ) + + @staticmethod + def _map_fix_type(fix_type: int) -> GpsStatus: + if fix_type in _FIX_TYPE_NO_FIX_OR_NONE: + return GpsStatus.NO_FIX + if fix_type == _FIX_TYPE_2D: + return GpsStatus.DEGRADED + return GpsStatus.STABLE + + def _maybe_promote_to_spoofed_or_non_spoofed(self) -> GpsStatus: + # AC-3: AP has no in-band signed-fix flag; we use STATUSTEXT + # spoofing sentinels seen within the last 5 s as a SPOOFED signal. + with self._lock: + sentinel_at = self._spoof_sentinel_seen_at + if sentinel_at is None: + return GpsStatus.STABLE + now = time.monotonic_ns() + if (now - sentinel_at) <= 5 * 1_000_000_000: + return GpsStatus.SPOOFED + return GpsStatus.STABLE + + @staticmethod + def _map_mav_state(*, system_status: int, base_mode: int) -> FlightState: + if system_status in (_MAV_STATE_UNINIT, _MAV_STATE_BOOT, _MAV_STATE_CALIBRATING): + return FlightState.INIT + if system_status in ( + _MAV_STATE_CRITICAL, + _MAV_STATE_EMERGENCY, + _MAV_STATE_FLIGHT_TERMINATION, + ): + return FlightState.FAILED + if system_status == _MAV_STATE_ACTIVE: + return FlightState.IN_FLIGHT + if system_status == _MAV_STATE_STANDBY: + if base_mode & _MAV_MODE_FLAG_SAFETY_ARMED: + return FlightState.ARMED + return FlightState.ON_GROUND + if system_status == _MAV_STATE_POWEROFF: + return FlightState.ON_GROUND + return FlightState.INIT diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_msp2.py b/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_msp2.py new file mode 100644 index 0000000..9cf9a1f --- /dev/null +++ b/src/gps_denied_onboard/components/c8_fc_adapter/_inbound_msp2.py @@ -0,0 +1,270 @@ +"""iNav MSP2 inbound decoder (AZ-391 / E-C8). + +MSP2 is request/response over a serial transport (YAMSPy ``MSPy``); +unlike MAVLink it doesn't push telemetry. The decoder runs a poll +loop that calls into a thin ``MspSource`` adapter and translates the +returned dictionaries into the unified :class:`FcTelemetryFrame` +shape (same payload types as the AP path so consumers don't care +which FC the wire originated from). + +iNav has no spoofing-detection telemetry (RESTRICT-COMM-2 + +spec AC-5); ``GpsStatus.SPOOFED`` is unreachable on this path. + +Tests drive the decoder via :meth:`feed_one_tick` which calls the +``MspSource`` poll methods once — no real serial port required. +""" + +from __future__ import annotations + +import threading +import time +from typing import Any, Final, Protocol + +from gps_denied_onboard._types.fc import ( + AttitudeSample, + FcTelemetryFrame, + FlightState, + FlightStateSignal, + GpsHealth, + GpsStatus, + ImuTelemetrySample, + TelemetryKind, +) +from gps_denied_onboard._types.geo import LatLonAlt +from gps_denied_onboard.components.c8_fc_adapter._subscription import SubscriptionBus +from gps_denied_onboard.components.c8_fc_adapter._telemetry_rings import TelemetryRing +from gps_denied_onboard.logging import get_logger + +__all__ = [ + "Msp2InavInboundDecoder", + "MspSource", +] + + +_OUT_OF_ORDER_KIND: Final[str] = "c8.inbound.out_of_order_frame_dropped" +_DECODE_ERROR_KIND: Final[str] = "c8.inbound.decode_error" + + +class MspSource(Protocol): + """Minimal MSPy-shaped interface the decoder polls. + + Concrete production implementation wraps a ``yamspy.MSPy`` + connection; the test harness uses a deterministic dict-returning + fake. + """ + + def read_imu(self) -> dict[str, Any]: ... + + def read_attitude(self) -> dict[str, Any]: ... + + def read_gps(self) -> dict[str, Any]: ... + + def read_flight_state(self) -> dict[str, Any]: ... + + +class Msp2InavInboundDecoder: + """iNav MSP2 polling decoder + dispatcher.""" + + def __init__( + self, + source: MspSource, + bus: SubscriptionBus, + *, + imu_ring_capacity: int = 200, + attitude_ring_capacity: int = 100, + gps_ring_capacity: int = 20, + state_ring_capacity: int = 10, + ) -> None: + self._source = source + self._bus = bus + self._log = get_logger("c8_fc_adapter.inbound_msp2") + self.imu_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing( + imu_ring_capacity, kind_name="imu" + ) + self.attitude_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing( + attitude_ring_capacity, kind_name="attitude" + ) + self.gps_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing( + gps_ring_capacity, kind_name="gps_health" + ) + self.state_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing( + state_ring_capacity, kind_name="flight_state" + ) + self._last_ts_ns: dict[TelemetryKind, int] = {} + self._warm_start_hint: LatLonAlt | None = None + self._warm_start_hint_at: int | None = None + self._stop_flag = threading.Event() + self._lock = threading.Lock() + + @property + def warm_start_hint(self) -> LatLonAlt | None: + with self._lock: + return self._warm_start_hint + + def feed_one_tick(self) -> int: + """Poll source once for IMU/attitude/GPS/state; returns # dispatched frames. + + IMU and attitude are polled every tick; GPS + state can be + polled at a lower rate by the production driver — we still + poll them here for AC simplicity. + """ + dispatched = 0 + for handler in (self._tick_imu, self._tick_attitude, self._tick_gps, self._tick_state): + try: + if handler(): + dispatched += 1 + except Exception as exc: + self._log_decode_error(handler.__name__, repr(exc)) + return dispatched + + def run_poll_loop(self, *, period_s: float = 0.01) -> None: + """Continuous polling loop; honours :meth:`stop`.""" + while not self._stop_flag.is_set(): + self.feed_one_tick() + time.sleep(period_s) + + def stop(self) -> None: + self._stop_flag.set() + + # ------------------------------------------------------------------ + # Per-tick handlers + + def _tick_imu(self) -> bool: + raw = self._source.read_imu() + if raw is None: + return False + # iNav MSP2 IMU dict: 'accelerometer': [x, y, z], 'gyroscope': [x, y, z] + # plus a 'time_ms' if the source surfaces it (YAMSPy doesn't include + # FC-side time, so we use monotonic_ns at decode boundary). + accel = tuple(float(v) for v in raw["accelerometer"]) + gyro = tuple(float(v) for v in raw["gyroscope"]) + if len(accel) != 3 or len(gyro) != 3: + raise ValueError( + f"iNav IMU dict shape: expected 3-vectors, got accel={accel}, gyro={gyro}" + ) + sensor_ts_ns = time.monotonic_ns() + payload = ImuTelemetrySample(ts_ns=sensor_ts_ns, accel_xyz=accel, gyro_xyz=gyro) + return self._dispatch(TelemetryKind.IMU_SAMPLE, payload, ring=self.imu_ring) + + def _tick_attitude(self) -> bool: + raw = self._source.read_attitude() + if raw is None: + return False + # iNav attitude dict: 'angx' (roll), 'angy' (pitch), 'heading' (yaw) — in degrees + # The MSP_ATTITUDE message returns: roll/pitch as 10ths of a deg, + # heading as -180..180 deg. We accept the YAMSPy-decoded dict shape + # which uses the same keys. + roll_rad = float(raw["angx"]) * (3.141592653589793 / 180.0) + pitch_rad = float(raw["angy"]) * (3.141592653589793 / 180.0) + yaw_rad = float(raw["heading"]) * (3.141592653589793 / 180.0) + sensor_ts_ns = time.monotonic_ns() + payload = AttitudeSample( + ts_ns=sensor_ts_ns, + roll_rad=roll_rad, + pitch_rad=pitch_rad, + yaw_rad=yaw_rad, + ) + return self._dispatch(TelemetryKind.ATTITUDE, payload, ring=self.attitude_ring) + + def _tick_gps(self) -> bool: + raw = self._source.read_gps() + if raw is None: + return False + # iNav MSP_RAW_GPS / MSP2_INAV_GPS dict: + # 'fix' (0/1/2), 'numSat', 'lat' (deg * 1e7), 'lon' (deg * 1e7), 'alt' (m) + # We translate iNav's coarser fix flag (0=no fix, 1=2D, 2+=3D) to our enum. + fix = int(raw["fix"]) + if fix == 0: + status = GpsStatus.NO_FIX + elif fix == 1: + status = GpsStatus.DEGRADED + else: + status = GpsStatus.STABLE + captured_at = time.monotonic_ns() + if fix >= 2: + lat_deg = float(raw["lat"]) / 1e7 + lon_deg = float(raw["lon"]) / 1e7 + alt_m = float(raw.get("alt", 0.0)) + with self._lock: + if self._warm_start_hint is None: + self._warm_start_hint = LatLonAlt(lat_deg, lon_deg, alt_m) + self._warm_start_hint_at = captured_at + payload = GpsHealth(status=status, fix_age_ms=0, captured_at=captured_at) + return self._dispatch(TelemetryKind.GPS_HEALTH, payload, ring=self.gps_ring) + + def _tick_state(self) -> bool: + raw = self._source.read_flight_state() + if raw is None: + return False + # iNav flight-state dict shape (subset we honour): + # 'armed': bool, 'in_flight': bool, 'failsafe': bool + captured_at = time.monotonic_ns() + if raw.get("failsafe", False): + state = FlightState.FAILED + elif raw.get("in_flight", False): + state = FlightState.IN_FLIGHT + elif raw.get("armed", False): + state = FlightState.ARMED + elif raw.get("initialised", False): + state = FlightState.ON_GROUND + else: + state = FlightState.INIT + with self._lock: + hint = self._warm_start_hint + hint_at = self._warm_start_hint_at + last_age_ms: int | None = None + if hint_at is not None: + last_age_ms = max(0, (captured_at - hint_at) // 1_000_000) + payload = FlightStateSignal( + state=state, + last_valid_gps_hint_wgs84=hint, + last_valid_gps_age_ms=last_age_ms, + captured_at=captured_at, + ) + return self._dispatch(TelemetryKind.MAV_STATE, payload, ring=self.state_ring) + + # ------------------------------------------------------------------ + # Dispatch helpers + + def _dispatch( + self, + kind: TelemetryKind, + payload: Any, + *, + ring: TelemetryRing[FcTelemetryFrame], + ) -> bool: + received_at = time.monotonic_ns() + last = self._last_ts_ns.get(kind) + if last is not None and received_at <= last: + self._log.warning( + f"c8.inbound.out_of_order_frame_dropped: kind={kind.name} " + f"prev_ns={last} this_ns={received_at}", + extra={ + "kind": _OUT_OF_ORDER_KIND, + "kv": { + "telemetry_kind": kind.name, + "prev_ns": last, + "this_ns": received_at, + }, + }, + ) + return False + self._last_ts_ns[kind] = received_at + frame = FcTelemetryFrame( + kind=kind, + payload=payload, + received_at=received_at, + signed=False, # iNav has no signing per RESTRICT-COMM-2 + ) + ring.push(frame) + self._bus.dispatch(frame) + return True + + def _log_decode_error(self, handler_name: str, error_repr: str) -> None: + self._log.debug( + f"c8.inbound.decode_error: handler={handler_name} {error_repr}", + extra={ + "kind": _DECODE_ERROR_KIND, + "kv": {"handler": handler_name, "error": error_repr}, + }, + ) diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/_subscription.py b/src/gps_denied_onboard/components/c8_fc_adapter/_subscription.py new file mode 100644 index 0000000..936c77e --- /dev/null +++ b/src/gps_denied_onboard/components/c8_fc_adapter/_subscription.py @@ -0,0 +1,96 @@ +"""Multi-subscriber telemetry fan-out (AZ-391 / E-C8). + +The single decode thread per adapter pushes :class:`FcTelemetryFrame` +into a :class:`SubscriptionBus`; the bus fans it out synchronously to +every registered callback. Cancelling a subscription removes the +callback from the next dispatch tick. + +Why synchronous fan-out and not per-subscriber threads: + +- ``FcAdapter`` callers (C1 VIO, C5 StateEstimator) own their own + decode-side ring buffers; the bus's responsibility is dispatch only. +- The contract says callbacks fire ON the decoder thread (Invariant + 8). Adding queue/thread per subscriber would (a) increase latency, + (b) hide cross-callback ordering bugs, (c) require shutdown-order + guarantees we don't actually need. + +Slow subscribers MUST not block the decode loop beyond 100 µs per +the AZ-391 constraint. The bus does NOT enforce this — subscribers +are trusted to use a non-blocking enqueue + drain on their own +thread. (See `_inbound_mavlink._fire_callbacks` for the catch + +DEBUG log path that prevents a subscriber crash from killing the +decode loop.) +""" + +from __future__ import annotations + +import itertools +import threading +from typing import Final + +from gps_denied_onboard._types.fc import FcTelemetryFrame, TelemetryCallback +from gps_denied_onboard.logging import get_logger + +__all__ = ["SubscriptionBus", "SubscriptionHandle"] + + +_CALLBACK_ERROR_KIND: Final[str] = "c8.inbound.callback_error" + + +class SubscriptionHandle: + """Concrete `Subscription` Protocol implementation.""" + + def __init__(self, bus: SubscriptionBus, sub_id: int) -> None: + self._bus = bus + self._sub_id = sub_id + + def cancel(self) -> None: + self._bus._cancel(self._sub_id) + + +class SubscriptionBus: + """Lock-guarded subscriber registry + fan-out dispatch.""" + + def __init__(self) -> None: + self._lock = threading.Lock() + self._subscribers: dict[int, TelemetryCallback] = {} + self._next_id = itertools.count(1) + self._log = get_logger("c8_fc_adapter.subscription_bus") + + def subscribe(self, callback: TelemetryCallback) -> SubscriptionHandle: + with self._lock: + sub_id = next(self._next_id) + self._subscribers[sub_id] = callback + return SubscriptionHandle(self, sub_id) + + def _cancel(self, sub_id: int) -> None: + with self._lock: + self._subscribers.pop(sub_id, None) + + @property + def subscriber_count(self) -> int: + with self._lock: + return len(self._subscribers) + + def dispatch(self, frame: FcTelemetryFrame) -> None: + """Fan ``frame`` out to every currently-registered subscriber. + + Subscriber exceptions are caught + DEBUG-logged so one bad + subscriber cannot kill the decode loop (Invariant 8). The + callback list is snapshot under the lock; callbacks fire OUTSIDE + the lock so a callback that calls back into ``subscribe`` / + ``cancel`` does not deadlock. + """ + with self._lock: + callbacks = list(self._subscribers.items()) + for sub_id, cb in callbacks: + try: + cb(frame) + except Exception as exc: + self._log.debug( + f"c8.inbound.callback_error: sub_id={sub_id} {exc!r}", + extra={ + "kind": _CALLBACK_ERROR_KIND, + "kv": {"sub_id": sub_id, "error": repr(exc)}, + }, + ) diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/_telemetry_rings.py b/src/gps_denied_onboard/components/c8_fc_adapter/_telemetry_rings.py new file mode 100644 index 0000000..69ef863 --- /dev/null +++ b/src/gps_denied_onboard/components/c8_fc_adapter/_telemetry_rings.py @@ -0,0 +1,112 @@ +"""Bounded per-kind telemetry rings (AZ-391 / E-C8). + +Single decode thread pushes; arbitrary number of consumer threads +``peek`` (lock-free read of the latest sample) and ``snapshot`` (copy +of the entire ring under the lock). Drop-oldest on overflow — emits a +single INFO log at the FIRST overflow per ring instance (subsequent +overflows are counted but not logged to avoid drowning the log). + +Why a custom ring and not :class:`collections.deque(maxlen=N)`: + +- We need a single INFO log on the FIRST overflow event, plus a + monotonic dropped-counter that survives across overflows. +- ``deque`` push is GIL-fast but the dropped-counter would still need + a separate lock; consolidating both behind a single ring is simpler + and gives ``peek_latest`` for free as O(1). + +The ring is thread-safe at the Python level (one lock guards both +``push`` and ``snapshot``). Decoder callbacks fire OUTSIDE the ring +lock per Invariant 8. +""" + +from __future__ import annotations + +import threading +from collections import deque +from collections.abc import Iterable +from typing import Final, Generic, TypeVar + +from gps_denied_onboard.logging import get_logger + +__all__ = ["TelemetryRing"] + + +T = TypeVar("T") + +_OVERFLOW_KIND: Final[str] = "c8.inbound.ring_overflow" + + +class TelemetryRing(Generic[T]): + """Bounded drop-oldest ring buffer for one telemetry kind.""" + + def __init__(self, capacity: int, *, kind_name: str) -> None: + if capacity <= 0: + raise ValueError(f"TelemetryRing capacity must be > 0; got {capacity}") + self._capacity = capacity + self._kind_name = kind_name + self._lock = threading.Lock() + self._items: deque[T] = deque(maxlen=capacity) + self._dropped_count = 0 + self._overflow_logged = False + self._log = get_logger("c8_fc_adapter.telemetry_ring") + + @property + def capacity(self) -> int: + return self._capacity + + @property + def kind_name(self) -> str: + return self._kind_name + + @property + def dropped_count(self) -> int: + with self._lock: + return self._dropped_count + + def push(self, item: T) -> bool: + """Push ``item`` (drop-oldest on overflow). Returns ``False`` on drop.""" + with self._lock: + if len(self._items) >= self._capacity: + self._dropped_count += 1 + should_log_first = not self._overflow_logged + self._items.append(item) + if should_log_first: + self._overflow_logged = True + else: + self._items.append(item) + should_log_first = False + + if should_log_first: + self._log.info( + f"c8.inbound.ring_overflow: kind={self._kind_name} capacity={self._capacity}", + extra={ + "kind": _OVERFLOW_KIND, + "kv": { + "ring_kind": self._kind_name, + "capacity": self._capacity, + }, + }, + ) + return False + return True + + def peek_latest(self) -> T | None: + """Return the most-recently-pushed item, or None if empty.""" + with self._lock: + if not self._items: + return None + return self._items[-1] + + def snapshot(self) -> list[T]: + """Return a shallow copy of the ring's contents (oldest -> newest).""" + with self._lock: + return list(self._items) + + def __len__(self) -> int: + with self._lock: + return len(self._items) + + def extend(self, items: Iterable[T]) -> None: + """Push multiple items; per-item drop accounting still applies.""" + for item in items: + self.push(item) diff --git a/tests/unit/c8_fc_adapter/test_az391_inbound_subscription.py b/tests/unit/c8_fc_adapter/test_az391_inbound_subscription.py new file mode 100644 index 0000000..b794fae --- /dev/null +++ b/tests/unit/c8_fc_adapter/test_az391_inbound_subscription.py @@ -0,0 +1,713 @@ +"""AZ-391 — C8 inbound subscription / decoder / rings / fan-out unit tests. + +Covers all 10 ACs of AZ-391 + the named NFRs that can be verified +inside a unit test (the ≥ 200 Hz sustain budget belongs in C8-PT-01). +""" + +from __future__ import annotations + +import logging +import threading +import time +from types import SimpleNamespace +from typing import Any +from unittest import mock + +import pytest + +from gps_denied_onboard._types.fc import ( + AttitudeSample, + FcTelemetryFrame, + FlightState, + FlightStateSignal, + GpsHealth, + GpsStatus, + ImuTelemetrySample, + TelemetryKind, +) +from gps_denied_onboard.components.c8_fc_adapter._inbound_mavlink import ( + PymavlinkInboundDecoder, +) +from gps_denied_onboard.components.c8_fc_adapter._inbound_msp2 import ( + Msp2InavInboundDecoder, +) +from gps_denied_onboard.components.c8_fc_adapter._subscription import ( + SubscriptionBus, +) +from gps_denied_onboard.components.c8_fc_adapter._telemetry_rings import ( + TelemetryRing, +) + +# ---------------------------------------------------------------------- +# Helpers + + +def _ap_msg(msg_type: str, **fields: Any) -> Any: + """Build a pymavlink-style stub message with `get_type()` + fields.""" + ns = SimpleNamespace(**fields) + ns.get_type = lambda: msg_type + return ns + + +def _make_decoder(bus: SubscriptionBus | None = None) -> PymavlinkInboundDecoder: + if bus is None: + bus = SubscriptionBus() + src = mock.MagicMock() + return PymavlinkInboundDecoder(src, bus) + + +# ---------------------------------------------------------------------- +# AC-1: AP RAW_IMU decode + + +def test_ac1_ap_raw_imu_decode() -> None: + # Arrange + bus = SubscriptionBus() + received: list[FcTelemetryFrame] = [] + bus.subscribe(lambda f: received.append(f)) + decoder = _make_decoder(bus) + msg = _ap_msg( + "RAW_IMU", + time_usec=12345, + xacc=10, + yacc=20, + zacc=-981, + xgyro=1, + ygyro=2, + zgyro=3, + ) + + # Act + dispatched = decoder.feed_one_message(msg) + + # Assert + assert dispatched is True + assert len(received) == 1 + frame = received[0] + assert frame.kind is TelemetryKind.IMU_SAMPLE + assert isinstance(frame.payload, ImuTelemetrySample) + assert frame.payload.accel_xyz == (10.0, 20.0, -981.0) + assert frame.payload.gyro_xyz == (1.0, 2.0, 3.0) + # received_at is monotonic_ns at decode boundary (positive, non-zero) + assert frame.received_at > 0 + + +# ---------------------------------------------------------------------- +# AC-2: AP ATTITUDE decode + + +def test_ac2_ap_attitude_decode() -> None: + # Arrange + bus = SubscriptionBus() + received: list[FcTelemetryFrame] = [] + bus.subscribe(lambda f: received.append(f)) + decoder = _make_decoder(bus) + msg = _ap_msg( + "ATTITUDE", + time_boot_ms=5000, + roll=0.1, + pitch=-0.2, + yaw=1.5, + rollspeed=0.0, + pitchspeed=0.0, + yawspeed=0.0, + ) + + # Act + decoder.feed_one_message(msg) + + # Assert + assert len(received) == 1 + assert received[0].kind is TelemetryKind.ATTITUDE + payload = received[0].payload + assert isinstance(payload, AttitudeSample) + assert payload.roll_rad == pytest.approx(0.1) + assert payload.pitch_rad == pytest.approx(-0.2) + assert payload.yaw_rad == pytest.approx(1.5) + + +# ---------------------------------------------------------------------- +# AC-3: AP GPS_RAW_INT -> GpsHealth fix-type mapping + + +@pytest.mark.parametrize( + "fix_type, expected_status", + [ + (0, GpsStatus.NO_FIX), + (1, GpsStatus.NO_FIX), + (2, GpsStatus.DEGRADED), + (3, GpsStatus.STABLE), + (4, GpsStatus.STABLE), # DGPS + (5, GpsStatus.STABLE), # RTK_FLOAT + (6, GpsStatus.STABLE), # RTK_FIXED + ], +) +def test_ac3_ap_gps_fix_type_mapping(fix_type: int, expected_status: GpsStatus) -> None: + # Arrange + bus = SubscriptionBus() + received: list[FcTelemetryFrame] = [] + bus.subscribe(lambda f: received.append(f)) + decoder = _make_decoder(bus) + msg = _ap_msg( + "GPS_RAW_INT", + time_usec=0, + fix_type=fix_type, + lat=505000000, + lon=300000000, + alt=100000, + eph=0, + epv=0, + vel=0, + cog=0, + satellites_visible=8, + alt_ellipsoid=0, + h_acc=0, + v_acc=0, + vel_acc=0, + hdg_acc=0, + yaw=0, + ) + + # Act + decoder.feed_one_message(msg) + + # Assert + assert isinstance(received[0].payload, GpsHealth) + assert received[0].payload.status is expected_status + + +def test_ac3_spoofing_sentinel_promotes_to_spoofed() -> None: + # Arrange + bus = SubscriptionBus() + received: list[FcTelemetryFrame] = [] + bus.subscribe(lambda f: received.append(f)) + decoder = _make_decoder(bus) + statustext = _ap_msg("STATUSTEXT", severity=4, text="GPS spoofing detected!", id=0, chunk_seq=0) + gps_msg = _ap_msg( + "GPS_RAW_INT", + time_usec=0, + fix_type=3, + lat=505000000, + lon=300000000, + alt=100000, + eph=0, + epv=0, + vel=0, + cog=0, + satellites_visible=8, + alt_ellipsoid=0, + h_acc=0, + v_acc=0, + vel_acc=0, + hdg_acc=0, + yaw=0, + ) + + # Act + decoder.feed_one_message(statustext) + decoder.feed_one_message(gps_msg) + + # Assert — the GPS frame after the sentinel is promoted to SPOOFED + gps_frames = [f for f in received if f.kind is TelemetryKind.GPS_HEALTH] + assert len(gps_frames) == 1 + assert gps_frames[0].payload.status is GpsStatus.SPOOFED + + +# ---------------------------------------------------------------------- +# AC-4: AP HEARTBEAT -> FlightState mapping + + +@pytest.mark.parametrize( + "system_status, base_mode, expected_state", + [ + (0, 0, FlightState.INIT), # UNINIT + (1, 0, FlightState.INIT), # BOOT + (2, 0, FlightState.INIT), # CALIBRATING + (3, 0x00, FlightState.ON_GROUND), # STANDBY, disarmed + (3, 0x80, FlightState.ARMED), # STANDBY, armed + (4, 0x80, FlightState.IN_FLIGHT), # ACTIVE + (5, 0, FlightState.FAILED), # CRITICAL + (6, 0, FlightState.FAILED), # EMERGENCY + (7, 0, FlightState.ON_GROUND), # POWEROFF + (8, 0, FlightState.FAILED), # FLIGHT_TERMINATION + ], +) +def test_ac4_heartbeat_to_flight_state( + system_status: int, base_mode: int, expected_state: FlightState +) -> None: + # Arrange + bus = SubscriptionBus() + received: list[FcTelemetryFrame] = [] + bus.subscribe(lambda f: received.append(f)) + decoder = _make_decoder(bus) + msg = _ap_msg( + "HEARTBEAT", + type=0, + autopilot=3, + base_mode=base_mode, + custom_mode=0, + system_status=system_status, + mavlink_version=2, + ) + + # Act + decoder.feed_one_message(msg) + + # Assert + assert received[0].kind is TelemetryKind.MAV_STATE + payload = received[0].payload + assert isinstance(payload, FlightStateSignal) + assert payload.state is expected_state + + +# ---------------------------------------------------------------------- +# AC-5: iNav MSP2 decode (same DTOs as AP) + + +class _FakeMspSource: + def __init__( + self, + imu: dict | None = None, + attitude: dict | None = None, + gps: dict | None = None, + state: dict | None = None, + ) -> None: + self._imu = imu + self._attitude = attitude + self._gps = gps + self._state = state + + def read_imu(self) -> dict | None: + return self._imu + + def read_attitude(self) -> dict | None: + return self._attitude + + def read_gps(self) -> dict | None: + return self._gps + + def read_flight_state(self) -> dict | None: + return self._state + + +def test_ac5_inav_msp2_imu_decode() -> None: + # Arrange + bus = SubscriptionBus() + received: list[FcTelemetryFrame] = [] + bus.subscribe(lambda f: received.append(f)) + source = _FakeMspSource( + imu={"accelerometer": [11, 22, -33], "gyroscope": [1, 2, 3]}, + ) + decoder = Msp2InavInboundDecoder(source, bus) + + # Act + decoder.feed_one_tick() + + # Assert + imu_frames = [f for f in received if f.kind is TelemetryKind.IMU_SAMPLE] + assert len(imu_frames) == 1 + payload = imu_frames[0].payload + assert isinstance(payload, ImuTelemetrySample) + assert payload.accel_xyz == (11.0, 22.0, -33.0) + assert payload.gyro_xyz == (1.0, 2.0, 3.0) + # iNav is unsigned per RESTRICT-COMM-2 + assert imu_frames[0].signed is False + + +def test_ac5_inav_msp2_attitude_decode() -> None: + # Arrange + bus = SubscriptionBus() + received: list[FcTelemetryFrame] = [] + bus.subscribe(lambda f: received.append(f)) + source = _FakeMspSource(attitude={"angx": 180.0, "angy": 0.0, "heading": 0.0}) + decoder = Msp2InavInboundDecoder(source, bus) + + # Act + decoder.feed_one_tick() + + # Assert + att = [f for f in received if f.kind is TelemetryKind.ATTITUDE] + assert len(att) == 1 + payload = att[0].payload + assert isinstance(payload, AttitudeSample) + # 180 deg -> pi rad + assert payload.roll_rad == pytest.approx(3.141592653589793) + + +def test_ac5_inav_spoofed_status_unreachable() -> None: + # Arrange + bus = SubscriptionBus() + received: list[FcTelemetryFrame] = [] + bus.subscribe(lambda f: received.append(f)) + source = _FakeMspSource(gps={"fix": 2, "lat": 505000000, "lon": 300000000, "alt": 100.0}) + decoder = Msp2InavInboundDecoder(source, bus) + + # Act + decoder.feed_one_tick() + + # Assert — iNav with valid fix is STABLE; SPOOFED is unreachable + gps = [f for f in received if f.kind is TelemetryKind.GPS_HEALTH] + assert len(gps) == 1 + assert gps[0].payload.status is GpsStatus.STABLE + + +# ---------------------------------------------------------------------- +# AC-6: Bounded ring drop-oldest + + +def test_ac6_ring_drop_oldest_and_logs_first_overflow( + caplog: pytest.LogCaptureFixture, +) -> None: + # Arrange + ring: TelemetryRing[int] = TelemetryRing(capacity=100, kind_name="imu") + caplog.set_level(logging.INFO, logger="c8_fc_adapter.telemetry_ring") + + # Act + for i in range(1000): + ring.push(i) + + # Assert — ring contains the latest 100 + items = ring.snapshot() + assert len(items) == 100 + assert items == list(range(900, 1000)) + # Dropped 900 items (the first 900 entered before any drops; once full, + # each subsequent push drops one). + assert ring.dropped_count == 900 + # Exactly ONE overflow record (first overflow), not 900 of them. + overflow_logs = [ + r for r in caplog.records if getattr(r, "kind", None) == "c8.inbound.ring_overflow" + ] + assert len(overflow_logs) == 1 + assert overflow_logs[0].kv["ring_kind"] == "imu" + + +def test_ac6_ring_capacity_zero_rejected() -> None: + # Act + Assert + with pytest.raises(ValueError, match=r"capacity must be > 0"): + TelemetryRing(capacity=0, kind_name="imu") + + +# ---------------------------------------------------------------------- +# AC-7: Multi-subscriber fan-out + + +def test_ac7_multi_subscriber_fan_out_and_cancel() -> None: + # Arrange + bus = SubscriptionBus() + counters = [0, 0, 0] + + def make_cb(idx: int): + def _cb(_f: FcTelemetryFrame) -> None: + counters[idx] += 1 + + return _cb + + handles = [bus.subscribe(make_cb(i)) for i in range(3)] + frame = _make_frame(TelemetryKind.IMU_SAMPLE) + + # Act 1 — all 3 receive + bus.dispatch(frame) + + # Assert 1 + assert counters == [1, 1, 1] + + # Act 2 — cancel one, dispatch again + handles[1].cancel() + bus.dispatch(frame) + + # Assert 2 + assert counters == [2, 1, 2] + + +def test_ac7_subscriber_exception_does_not_kill_dispatch( + caplog: pytest.LogCaptureFixture, +) -> None: + # Arrange + bus = SubscriptionBus() + received: list[FcTelemetryFrame] = [] + bus.subscribe(lambda _f: (_ for _ in ()).throw(RuntimeError("boom"))) + bus.subscribe(lambda f: received.append(f)) + caplog.set_level(logging.DEBUG, logger="c8_fc_adapter.subscription_bus") + frame = _make_frame(TelemetryKind.IMU_SAMPLE) + + # Act + bus.dispatch(frame) + + # Assert — second subscriber STILL fired + assert len(received) == 1 + # And the bad subscriber's exception was DEBUG-logged + err_logs = [ + r for r in caplog.records if getattr(r, "kind", None) == "c8.inbound.callback_error" + ] + assert len(err_logs) == 1 + + +# ---------------------------------------------------------------------- +# AC-8: Warm-start hint surfaces within 1 s of first GPS_RAW_INT + + +def test_ac8_warm_start_hint_present_after_first_gps() -> None: + # Arrange + bus = SubscriptionBus() + decoder = _make_decoder(bus) + # Before any GPS: hint is None + assert decoder.warm_start_hint is None + + # Act + msg = _ap_msg( + "GPS_RAW_INT", + time_usec=0, + fix_type=3, + lat=505000000, + lon=300000000, + alt=125000, # 50.5 / 30.0 / 125 m + eph=0, + epv=0, + vel=0, + cog=0, + satellites_visible=8, + alt_ellipsoid=0, + h_acc=0, + v_acc=0, + vel_acc=0, + hdg_acc=0, + yaw=0, + ) + decoder.feed_one_message(msg) + + # Assert + hint = decoder.warm_start_hint + assert hint is not None + assert hint.lat_deg == pytest.approx(50.5) + assert hint.lon_deg == pytest.approx(30.0) + assert hint.alt_m == pytest.approx(125.0) + + +def test_ac8_warm_start_hint_propagates_to_flight_state_signal() -> None: + # Arrange + bus = SubscriptionBus() + received: list[FcTelemetryFrame] = [] + bus.subscribe(lambda f: received.append(f)) + decoder = _make_decoder(bus) + gps = _ap_msg( + "GPS_RAW_INT", + time_usec=0, + fix_type=3, + lat=505000000, + lon=300000000, + alt=125000, + eph=0, + epv=0, + vel=0, + cog=0, + satellites_visible=8, + alt_ellipsoid=0, + h_acc=0, + v_acc=0, + vel_acc=0, + hdg_acc=0, + yaw=0, + ) + heartbeat = _ap_msg( + "HEARTBEAT", + type=0, + autopilot=3, + base_mode=0, + custom_mode=0, + system_status=3, + mavlink_version=2, + ) + + # Act + decoder.feed_one_message(gps) + decoder.feed_one_message(heartbeat) + + # Assert + state_frames = [f for f in received if f.kind is TelemetryKind.MAV_STATE] + assert len(state_frames) == 1 + payload = state_frames[0].payload + assert payload.last_valid_gps_hint_wgs84 is not None + assert payload.last_valid_gps_age_ms is not None + + +# ---------------------------------------------------------------------- +# AC-9: Out-of-order drop + WARN + + +def test_ac9_out_of_order_dropped_with_warn(caplog: pytest.LogCaptureFixture) -> None: + # Arrange + bus = SubscriptionBus() + received: list[FcTelemetryFrame] = [] + bus.subscribe(lambda f: received.append(f)) + decoder = _make_decoder(bus) + caplog.set_level(logging.WARNING, logger="c8_fc_adapter.inbound_mavlink") + # First message + msg1 = _ap_msg( + "ATTITUDE", + time_boot_ms=1000, + roll=0.0, + pitch=0.0, + yaw=0.0, + rollspeed=0.0, + pitchspeed=0.0, + yawspeed=0.0, + ) + decoder.feed_one_message(msg1) + # Capture the first frame's received_at; we'll force the next frame to be + # apparently older by injecting a manipulated `_last_ts_ns` entry. + # Force the decoder's per-kind last-ts way into the future so any next + # frame is "before" it. + decoder._last_ts_ns[TelemetryKind.ATTITUDE] = time.monotonic_ns() + 10_000_000_000 + + # Act + msg2 = _ap_msg( + "ATTITUDE", + time_boot_ms=2000, + roll=0.1, + pitch=0.1, + yaw=0.1, + rollspeed=0.0, + pitchspeed=0.0, + yawspeed=0.0, + ) + decoder.feed_one_message(msg2) + + # Assert + att_frames = [f for f in received if f.kind is TelemetryKind.ATTITUDE] + assert len(att_frames) == 1 # second frame dropped + warn_logs = [ + r + for r in caplog.records + if getattr(r, "kind", None) == "c8.inbound.out_of_order_frame_dropped" + ] + assert len(warn_logs) == 1 + assert warn_logs[0].kv["telemetry_kind"] == "ATTITUDE" + + +# ---------------------------------------------------------------------- +# AC-10: Decode-error isolation + + +def test_ac10_corrupt_frame_does_not_kill_decoder( + caplog: pytest.LogCaptureFixture, +) -> None: + # Arrange + bus = SubscriptionBus() + received: list[FcTelemetryFrame] = [] + bus.subscribe(lambda f: received.append(f)) + decoder = _make_decoder(bus) + caplog.set_level(logging.DEBUG, logger="c8_fc_adapter.inbound_mavlink") + # Corrupt RAW_IMU — `xacc` field missing (AttributeError) on access + corrupt = SimpleNamespace() + corrupt.get_type = lambda: "RAW_IMU" + # Good RAW_IMU immediately after + good = _ap_msg( + "RAW_IMU", + time_usec=0, + xacc=1, + yacc=2, + zacc=3, + xgyro=4, + ygyro=5, + zgyro=6, + ) + + # Act + decoder.feed_one_message(corrupt) + decoder.feed_one_message(good) + + # Assert — bad frame logged, good frame dispatched + err_logs = [r for r in caplog.records if getattr(r, "kind", None) == "c8.inbound.decode_error"] + assert len(err_logs) == 1 + imu_frames = [f for f in received if f.kind is TelemetryKind.IMU_SAMPLE] + assert len(imu_frames) == 1 + assert imu_frames[0].payload.accel_xyz == (1.0, 2.0, 3.0) + + +# ---------------------------------------------------------------------- +# NFR: IMU callback latency under 1 ms (loose budget for unit test) + + +def test_nfr_perf_imu_callback_under_1ms() -> None: + # Arrange + bus = SubscriptionBus() + bus.subscribe(lambda _f: None) + decoder = _make_decoder(bus) + + iters = 200 + msgs = [ + _ap_msg( + "RAW_IMU", + time_usec=i, + xacc=i, + yacc=i, + zacc=i, + xgyro=i, + ygyro=i, + zgyro=i, + ) + for i in range(iters) + ] + + # Act + start = time.perf_counter() + for m in msgs: + decoder.feed_one_message(m) + avg_s = (time.perf_counter() - start) / iters + + # Assert + assert avg_s < 1e-3, f"avg {avg_s * 1e3:.3f}ms > 1ms budget" + + +# ---------------------------------------------------------------------- +# NFR: Thread-safety smoke (single-producer / single-consumer over a ring) + + +def test_nfr_ring_thread_safety_smoke() -> None: + # Arrange + ring: TelemetryRing[int] = TelemetryRing(capacity=1024, kind_name="imu") + n = 5000 + + def producer() -> None: + for i in range(n): + ring.push(i) + + def consumer_snapshots() -> None: + for _ in range(n): + ring.snapshot() + + # Act + threads = [ + threading.Thread(target=producer), + threading.Thread(target=consumer_snapshots), + ] + for t in threads: + t.start() + for t in threads: + t.join() + + # Assert — no crash; ring still has bounded size + assert len(ring) <= 1024 + + +# ---------------------------------------------------------------------- +# Helpers (factories for test frames) + + +def _make_frame(kind: TelemetryKind) -> FcTelemetryFrame: + if kind is TelemetryKind.IMU_SAMPLE: + payload: Any = ImuTelemetrySample( + ts_ns=0, accel_xyz=(0.0, 0.0, 0.0), gyro_xyz=(0.0, 0.0, 0.0) + ) + elif kind is TelemetryKind.ATTITUDE: + payload = AttitudeSample(ts_ns=0, roll_rad=0.0, pitch_rad=0.0, yaw_rad=0.0) + elif kind is TelemetryKind.GPS_HEALTH: + payload = GpsHealth(status=GpsStatus.STABLE, fix_age_ms=0, captured_at=0) + else: + payload = FlightStateSignal( + state=FlightState.INIT, + last_valid_gps_hint_wgs84=None, + last_valid_gps_age_ms=None, + captured_at=0, + ) + return FcTelemetryFrame(kind=kind, payload=payload, received_at=0, signed=False)