[AZ-391] C8 inbound: MAVLink + MSP2 decoders + rings + bus + warm-start

Adds the C8 inbound producer side:
- TelemetryRing[T]: bounded drop-oldest ring; first-overflow INFO log
  + monotonic dropped_count.
- SubscriptionBus + SubscriptionHandle: synchronous fan-out, lock-
  released-before-callback to avoid deadlock; subscriber crash caught
  + DEBUG-logged so one bad subscriber cannot kill the decode loop.
- PymavlinkInboundDecoder: pymavlink-based AP decoder for RAW_IMU,
  SCALED_IMU2, ATTITUDE, GPS_RAW_INT, GPS2_RAW, HEARTBEAT, STATUSTEXT.
  Out-of-order drop (Invariant 7) per-kind WARN. STATUSTEXT spoofing
  sentinel promotes subsequent GPS to GpsStatus.SPOOFED within 5 s.
  AC-5.1 warm-start hint cached on first 3D+ fix; embedded into
  every FlightStateSignal.
- Msp2InavInboundDecoder: YAMSPy-based iNav polling decoder for IMU /
  attitude / GPS / flight-state. signed=False always (RESTRICT-COMM-2);
  GpsStatus.SPOOFED is unreachable on iNav.

Adds yamspy>=0.3.3 + pyserial>=3.5 to pyproject.toml.

Tests: 443 pass / 2 skip / 0 fail (+33 in batch 9).

Contract: no drift on fc_adapter_protocol.md v1.0.0; this batch
implements the inbound producer side without changing signatures.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-11 04:28:14 +03:00
parent 362e93c626
commit a61d2d3f4b
10 changed files with 1708 additions and 1 deletions
@@ -0,0 +1,355 @@
"""AP MAVLink 2.0 inbound decoder (AZ-391 / E-C8).
Reads frames from a pymavlink-style source (``recv_match`` interface),
translates the five contract message types to :class:`FcTelemetryFrame`
records, maintains the per-kind rings, and dispatches to subscribers.
Wire types we decode:
- ``RAW_IMU`` -> ``ImuTelemetrySample`` (kind=IMU_SAMPLE)
- ``ATTITUDE`` -> ``AttitudeSample`` (kind=ATTITUDE)
- ``GPS_RAW_INT`` -> ``GpsHealth`` (kind=GPS_HEALTH)
- ``HEARTBEAT`` -> ``FlightStateSignal`` (kind=MAV_STATE)
- ``STATUSTEXT`` -> spoofing-sentinel sideband (consumed internally)
The decoder is fed via an injected ``MAVLinkSource`` (Protocol with
``recv_match(...) -> message | None``), so tests can drive it
synchronously without a real serial port.
"""
from __future__ import annotations
import threading
import time
from typing import Any, Final, Protocol
from gps_denied_onboard._types.fc import (
AttitudeSample,
FcTelemetryFrame,
FlightState,
FlightStateSignal,
GpsHealth,
GpsStatus,
ImuTelemetrySample,
TelemetryKind,
)
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard.components.c8_fc_adapter._subscription import SubscriptionBus
from gps_denied_onboard.components.c8_fc_adapter._telemetry_rings import TelemetryRing
from gps_denied_onboard.logging import get_logger
__all__ = [
"AP_MESSAGE_TYPES",
"MAVLinkSource",
"PymavlinkInboundDecoder",
]
AP_MESSAGE_TYPES: Final[tuple[str, ...]] = (
"RAW_IMU",
"SCALED_IMU2",
"ATTITUDE",
"GPS_RAW_INT",
"GPS2_RAW",
"HEARTBEAT",
"STATUSTEXT",
)
# MAVLink GPS_FIX_TYPE enum values (subset we map).
_FIX_TYPE_NO_FIX_OR_NONE: Final[frozenset[int]] = frozenset({0, 1})
_FIX_TYPE_2D: Final[int] = 2
# MAV_STATE values (subset we map).
_MAV_STATE_UNINIT: Final[int] = 0
_MAV_STATE_BOOT: Final[int] = 1
_MAV_STATE_CALIBRATING: Final[int] = 2
_MAV_STATE_STANDBY: Final[int] = 3
_MAV_STATE_ACTIVE: Final[int] = 4
_MAV_STATE_CRITICAL: Final[int] = 5
_MAV_STATE_EMERGENCY: Final[int] = 6
_MAV_STATE_POWEROFF: Final[int] = 7
_MAV_STATE_FLIGHT_TERMINATION: Final[int] = 8
# `base_mode` bit that signals MAV_MODE_FLAG_SAFETY_ARMED.
_MAV_MODE_FLAG_SAFETY_ARMED: Final[int] = 0x80
# Substrings in STATUSTEXT that promote subsequent GPS_RAW_INT decodes to SPOOFED.
_SPOOFING_SENTINELS: Final[tuple[str, ...]] = (
"GPS spoofing",
"GPS jamming",
)
_OUT_OF_ORDER_KIND: Final[str] = "c8.inbound.out_of_order_frame_dropped"
_DECODE_ERROR_KIND: Final[str] = "c8.inbound.decode_error"
_SPOOFING_DETECTED_KIND: Final[str] = "c8.inbound.spoofing_sentinel_seen"
class MAVLinkSource(Protocol):
"""Minimal pymavlink-mavutil interface the decoder consumes."""
def recv_match(
self,
*,
type: str | list[str] | None = None,
blocking: bool = False,
timeout: float | None = None,
) -> Any | None: ...
class PymavlinkInboundDecoder:
"""Per-AP-adapter MAVLink 2.0 inbound decoder + dispatcher."""
def __init__(
self,
source: MAVLinkSource,
bus: SubscriptionBus,
*,
imu_ring_capacity: int = 200,
attitude_ring_capacity: int = 100,
gps_ring_capacity: int = 20,
state_ring_capacity: int = 10,
) -> None:
self._source = source
self._bus = bus
self._log = get_logger("c8_fc_adapter.inbound_mavlink")
self.imu_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing(
imu_ring_capacity, kind_name="imu"
)
self.attitude_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing(
attitude_ring_capacity, kind_name="attitude"
)
self.gps_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing(
gps_ring_capacity, kind_name="gps_health"
)
self.state_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing(
state_ring_capacity, kind_name="flight_state"
)
self._last_ts_ns: dict[TelemetryKind, int] = {}
self._spoof_sentinel_seen_at: int | None = None
# AC-5.1 warm-start hint: cached on first GPS_RAW_INT with 3D+ fix.
self._warm_start_hint: LatLonAlt | None = None
self._warm_start_hint_at: int | None = None
self._stop_flag = threading.Event()
self._lock = threading.Lock()
@property
def warm_start_hint(self) -> LatLonAlt | None:
with self._lock:
return self._warm_start_hint
def feed_one_message(self, msg: Any) -> bool:
"""Decode one MAVLink message synchronously (test-friendly entrypoint).
Returns ``True`` if a frame was dispatched, ``False`` on drop /
unknown type / decode error. Production use prefers
:meth:`run_decode_loop` which pulls from ``source.recv_match``.
"""
if msg is None:
return False
try:
msg_type = msg.get_type() if hasattr(msg, "get_type") else type(msg).__name__
except Exception as exc:
self._log_decode_error("get_type_failed", repr(exc))
return False
try:
if msg_type in ("RAW_IMU", "SCALED_IMU2"):
return self._handle_imu(msg)
if msg_type == "ATTITUDE":
return self._handle_attitude(msg)
if msg_type in ("GPS_RAW_INT", "GPS2_RAW"):
return self._handle_gps(msg)
if msg_type == "HEARTBEAT":
return self._handle_heartbeat(msg)
if msg_type == "STATUSTEXT":
self._handle_statustext(msg)
return False
except Exception as exc:
self._log_decode_error(msg_type, repr(exc))
return False
return False
def run_decode_loop(self, *, poll_timeout_s: float = 0.5) -> None:
"""Continuous decode loop; honours :meth:`stop`."""
while not self._stop_flag.is_set():
try:
msg = self._source.recv_match(
type=list(AP_MESSAGE_TYPES),
blocking=True,
timeout=poll_timeout_s,
)
except Exception as exc:
self._log_decode_error("recv_match_failed", repr(exc))
continue
self.feed_one_message(msg)
def stop(self) -> None:
self._stop_flag.set()
# ------------------------------------------------------------------
# Per-type handlers
def _handle_imu(self, msg: Any) -> bool:
# pymavlink RAW_IMU exposes raw signed-16 ticks; we forward as-is.
accel = (float(msg.xacc), float(msg.yacc), float(msg.zacc))
gyro = (float(msg.xgyro), float(msg.ygyro), float(msg.zgyro))
# `time_usec` is FC-side; we override with monotonic_ns at decode boundary
# for the FdrTelemetryFrame.received_at; the payload's ts_ns is the
# FC sensor stamp (epoch us -> ns).
sensor_ts_ns = int(msg.time_usec) * 1000
payload = ImuTelemetrySample(
ts_ns=sensor_ts_ns,
accel_xyz=accel,
gyro_xyz=gyro,
)
return self._dispatch(TelemetryKind.IMU_SAMPLE, payload, ring=self.imu_ring)
def _handle_attitude(self, msg: Any) -> bool:
sensor_ts_ns = int(msg.time_boot_ms) * 1_000_000
payload = AttitudeSample(
ts_ns=sensor_ts_ns,
roll_rad=float(msg.roll),
pitch_rad=float(msg.pitch),
yaw_rad=float(msg.yaw),
)
return self._dispatch(TelemetryKind.ATTITUDE, payload, ring=self.attitude_ring)
def _handle_gps(self, msg: Any) -> bool:
fix_type = int(msg.fix_type)
status = self._map_fix_type(fix_type)
if status is GpsStatus.STABLE:
status = self._maybe_promote_to_spoofed_or_non_spoofed()
captured_at = time.monotonic_ns()
payload = GpsHealth(status=status, fix_age_ms=0, captured_at=captured_at)
# AC-5.1: cache warm-start hint on first 3D+ fix.
if fix_type >= 3:
lat_deg = int(msg.lat) / 1e7
lon_deg = int(msg.lon) / 1e7
alt_m = int(msg.alt) / 1000.0
with self._lock:
if self._warm_start_hint is None:
self._warm_start_hint = LatLonAlt(lat_deg, lon_deg, alt_m)
self._warm_start_hint_at = captured_at
return self._dispatch(TelemetryKind.GPS_HEALTH, payload, ring=self.gps_ring)
def _handle_heartbeat(self, msg: Any) -> bool:
captured_at = time.monotonic_ns()
state = self._map_mav_state(
system_status=int(msg.system_status),
base_mode=int(msg.base_mode),
)
with self._lock:
hint = self._warm_start_hint
hint_at = self._warm_start_hint_at
last_age_ms: int | None = None
if hint_at is not None:
last_age_ms = max(0, (captured_at - hint_at) // 1_000_000)
payload = FlightStateSignal(
state=state,
last_valid_gps_hint_wgs84=hint,
last_valid_gps_age_ms=last_age_ms,
captured_at=captured_at,
)
return self._dispatch(TelemetryKind.MAV_STATE, payload, ring=self.state_ring)
def _handle_statustext(self, msg: Any) -> None:
text = msg.text
if isinstance(text, bytes):
text = text.decode("utf-8", errors="replace")
if not any(sentinel.lower() in text.lower() for sentinel in _SPOOFING_SENTINELS):
return
captured_at = time.monotonic_ns()
with self._lock:
self._spoof_sentinel_seen_at = captured_at
self._log.warning(
f"c8.inbound.spoofing_sentinel_seen: {text!r}",
extra={
"kind": _SPOOFING_DETECTED_KIND,
"kv": {"text": text, "captured_at": captured_at},
},
)
# ------------------------------------------------------------------
# Dispatch helpers
def _dispatch(
self,
kind: TelemetryKind,
payload: Any,
*,
ring: TelemetryRing[FcTelemetryFrame],
) -> bool:
received_at = time.monotonic_ns()
last = self._last_ts_ns.get(kind)
if last is not None and received_at <= last:
self._log.warning(
f"c8.inbound.out_of_order_frame_dropped: kind={kind.name} "
f"prev_ns={last} this_ns={received_at}",
extra={
"kind": _OUT_OF_ORDER_KIND,
"kv": {
"telemetry_kind": kind.name,
"prev_ns": last,
"this_ns": received_at,
},
},
)
return False
self._last_ts_ns[kind] = received_at
frame = FcTelemetryFrame(
kind=kind,
payload=payload,
received_at=received_at,
signed=True, # AP path uses MAVLink 2.0 signing per AZ-395 (assumed true)
)
ring.push(frame)
self._bus.dispatch(frame)
return True
def _log_decode_error(self, msg_type: str, error_repr: str) -> None:
self._log.debug(
f"c8.inbound.decode_error: msg_type={msg_type} {error_repr}",
extra={
"kind": _DECODE_ERROR_KIND,
"kv": {"msg_type": msg_type, "error": error_repr},
},
)
@staticmethod
def _map_fix_type(fix_type: int) -> GpsStatus:
if fix_type in _FIX_TYPE_NO_FIX_OR_NONE:
return GpsStatus.NO_FIX
if fix_type == _FIX_TYPE_2D:
return GpsStatus.DEGRADED
return GpsStatus.STABLE
def _maybe_promote_to_spoofed_or_non_spoofed(self) -> GpsStatus:
# AC-3: AP has no in-band signed-fix flag; we use STATUSTEXT
# spoofing sentinels seen within the last 5 s as a SPOOFED signal.
with self._lock:
sentinel_at = self._spoof_sentinel_seen_at
if sentinel_at is None:
return GpsStatus.STABLE
now = time.monotonic_ns()
if (now - sentinel_at) <= 5 * 1_000_000_000:
return GpsStatus.SPOOFED
return GpsStatus.STABLE
@staticmethod
def _map_mav_state(*, system_status: int, base_mode: int) -> FlightState:
if system_status in (_MAV_STATE_UNINIT, _MAV_STATE_BOOT, _MAV_STATE_CALIBRATING):
return FlightState.INIT
if system_status in (
_MAV_STATE_CRITICAL,
_MAV_STATE_EMERGENCY,
_MAV_STATE_FLIGHT_TERMINATION,
):
return FlightState.FAILED
if system_status == _MAV_STATE_ACTIVE:
return FlightState.IN_FLIGHT
if system_status == _MAV_STATE_STANDBY:
if base_mode & _MAV_MODE_FLAG_SAFETY_ARMED:
return FlightState.ARMED
return FlightState.ON_GROUND
if system_status == _MAV_STATE_POWEROFF:
return FlightState.ON_GROUND
return FlightState.INIT
@@ -0,0 +1,270 @@
"""iNav MSP2 inbound decoder (AZ-391 / E-C8).
MSP2 is request/response over a serial transport (YAMSPy ``MSPy``);
unlike MAVLink it doesn't push telemetry. The decoder runs a poll
loop that calls into a thin ``MspSource`` adapter and translates the
returned dictionaries into the unified :class:`FcTelemetryFrame`
shape (same payload types as the AP path so consumers don't care
which FC the wire originated from).
iNav has no spoofing-detection telemetry (RESTRICT-COMM-2 +
spec AC-5); ``GpsStatus.SPOOFED`` is unreachable on this path.
Tests drive the decoder via :meth:`feed_one_tick` which calls the
``MspSource`` poll methods once — no real serial port required.
"""
from __future__ import annotations
import threading
import time
from typing import Any, Final, Protocol
from gps_denied_onboard._types.fc import (
AttitudeSample,
FcTelemetryFrame,
FlightState,
FlightStateSignal,
GpsHealth,
GpsStatus,
ImuTelemetrySample,
TelemetryKind,
)
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard.components.c8_fc_adapter._subscription import SubscriptionBus
from gps_denied_onboard.components.c8_fc_adapter._telemetry_rings import TelemetryRing
from gps_denied_onboard.logging import get_logger
__all__ = [
"Msp2InavInboundDecoder",
"MspSource",
]
_OUT_OF_ORDER_KIND: Final[str] = "c8.inbound.out_of_order_frame_dropped"
_DECODE_ERROR_KIND: Final[str] = "c8.inbound.decode_error"
class MspSource(Protocol):
"""Minimal MSPy-shaped interface the decoder polls.
Concrete production implementation wraps a ``yamspy.MSPy``
connection; the test harness uses a deterministic dict-returning
fake.
"""
def read_imu(self) -> dict[str, Any]: ...
def read_attitude(self) -> dict[str, Any]: ...
def read_gps(self) -> dict[str, Any]: ...
def read_flight_state(self) -> dict[str, Any]: ...
class Msp2InavInboundDecoder:
"""iNav MSP2 polling decoder + dispatcher."""
def __init__(
self,
source: MspSource,
bus: SubscriptionBus,
*,
imu_ring_capacity: int = 200,
attitude_ring_capacity: int = 100,
gps_ring_capacity: int = 20,
state_ring_capacity: int = 10,
) -> None:
self._source = source
self._bus = bus
self._log = get_logger("c8_fc_adapter.inbound_msp2")
self.imu_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing(
imu_ring_capacity, kind_name="imu"
)
self.attitude_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing(
attitude_ring_capacity, kind_name="attitude"
)
self.gps_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing(
gps_ring_capacity, kind_name="gps_health"
)
self.state_ring: TelemetryRing[FcTelemetryFrame] = TelemetryRing(
state_ring_capacity, kind_name="flight_state"
)
self._last_ts_ns: dict[TelemetryKind, int] = {}
self._warm_start_hint: LatLonAlt | None = None
self._warm_start_hint_at: int | None = None
self._stop_flag = threading.Event()
self._lock = threading.Lock()
@property
def warm_start_hint(self) -> LatLonAlt | None:
with self._lock:
return self._warm_start_hint
def feed_one_tick(self) -> int:
"""Poll source once for IMU/attitude/GPS/state; returns # dispatched frames.
IMU and attitude are polled every tick; GPS + state can be
polled at a lower rate by the production driver — we still
poll them here for AC simplicity.
"""
dispatched = 0
for handler in (self._tick_imu, self._tick_attitude, self._tick_gps, self._tick_state):
try:
if handler():
dispatched += 1
except Exception as exc:
self._log_decode_error(handler.__name__, repr(exc))
return dispatched
def run_poll_loop(self, *, period_s: float = 0.01) -> None:
"""Continuous polling loop; honours :meth:`stop`."""
while not self._stop_flag.is_set():
self.feed_one_tick()
time.sleep(period_s)
def stop(self) -> None:
self._stop_flag.set()
# ------------------------------------------------------------------
# Per-tick handlers
def _tick_imu(self) -> bool:
raw = self._source.read_imu()
if raw is None:
return False
# iNav MSP2 IMU dict: 'accelerometer': [x, y, z], 'gyroscope': [x, y, z]
# plus a 'time_ms' if the source surfaces it (YAMSPy doesn't include
# FC-side time, so we use monotonic_ns at decode boundary).
accel = tuple(float(v) for v in raw["accelerometer"])
gyro = tuple(float(v) for v in raw["gyroscope"])
if len(accel) != 3 or len(gyro) != 3:
raise ValueError(
f"iNav IMU dict shape: expected 3-vectors, got accel={accel}, gyro={gyro}"
)
sensor_ts_ns = time.monotonic_ns()
payload = ImuTelemetrySample(ts_ns=sensor_ts_ns, accel_xyz=accel, gyro_xyz=gyro)
return self._dispatch(TelemetryKind.IMU_SAMPLE, payload, ring=self.imu_ring)
def _tick_attitude(self) -> bool:
raw = self._source.read_attitude()
if raw is None:
return False
# iNav attitude dict: 'angx' (roll), 'angy' (pitch), 'heading' (yaw) — in degrees
# The MSP_ATTITUDE message returns: roll/pitch as 10ths of a deg,
# heading as -180..180 deg. We accept the YAMSPy-decoded dict shape
# which uses the same keys.
roll_rad = float(raw["angx"]) * (3.141592653589793 / 180.0)
pitch_rad = float(raw["angy"]) * (3.141592653589793 / 180.0)
yaw_rad = float(raw["heading"]) * (3.141592653589793 / 180.0)
sensor_ts_ns = time.monotonic_ns()
payload = AttitudeSample(
ts_ns=sensor_ts_ns,
roll_rad=roll_rad,
pitch_rad=pitch_rad,
yaw_rad=yaw_rad,
)
return self._dispatch(TelemetryKind.ATTITUDE, payload, ring=self.attitude_ring)
def _tick_gps(self) -> bool:
raw = self._source.read_gps()
if raw is None:
return False
# iNav MSP_RAW_GPS / MSP2_INAV_GPS dict:
# 'fix' (0/1/2), 'numSat', 'lat' (deg * 1e7), 'lon' (deg * 1e7), 'alt' (m)
# We translate iNav's coarser fix flag (0=no fix, 1=2D, 2+=3D) to our enum.
fix = int(raw["fix"])
if fix == 0:
status = GpsStatus.NO_FIX
elif fix == 1:
status = GpsStatus.DEGRADED
else:
status = GpsStatus.STABLE
captured_at = time.monotonic_ns()
if fix >= 2:
lat_deg = float(raw["lat"]) / 1e7
lon_deg = float(raw["lon"]) / 1e7
alt_m = float(raw.get("alt", 0.0))
with self._lock:
if self._warm_start_hint is None:
self._warm_start_hint = LatLonAlt(lat_deg, lon_deg, alt_m)
self._warm_start_hint_at = captured_at
payload = GpsHealth(status=status, fix_age_ms=0, captured_at=captured_at)
return self._dispatch(TelemetryKind.GPS_HEALTH, payload, ring=self.gps_ring)
def _tick_state(self) -> bool:
raw = self._source.read_flight_state()
if raw is None:
return False
# iNav flight-state dict shape (subset we honour):
# 'armed': bool, 'in_flight': bool, 'failsafe': bool
captured_at = time.monotonic_ns()
if raw.get("failsafe", False):
state = FlightState.FAILED
elif raw.get("in_flight", False):
state = FlightState.IN_FLIGHT
elif raw.get("armed", False):
state = FlightState.ARMED
elif raw.get("initialised", False):
state = FlightState.ON_GROUND
else:
state = FlightState.INIT
with self._lock:
hint = self._warm_start_hint
hint_at = self._warm_start_hint_at
last_age_ms: int | None = None
if hint_at is not None:
last_age_ms = max(0, (captured_at - hint_at) // 1_000_000)
payload = FlightStateSignal(
state=state,
last_valid_gps_hint_wgs84=hint,
last_valid_gps_age_ms=last_age_ms,
captured_at=captured_at,
)
return self._dispatch(TelemetryKind.MAV_STATE, payload, ring=self.state_ring)
# ------------------------------------------------------------------
# Dispatch helpers
def _dispatch(
self,
kind: TelemetryKind,
payload: Any,
*,
ring: TelemetryRing[FcTelemetryFrame],
) -> bool:
received_at = time.monotonic_ns()
last = self._last_ts_ns.get(kind)
if last is not None and received_at <= last:
self._log.warning(
f"c8.inbound.out_of_order_frame_dropped: kind={kind.name} "
f"prev_ns={last} this_ns={received_at}",
extra={
"kind": _OUT_OF_ORDER_KIND,
"kv": {
"telemetry_kind": kind.name,
"prev_ns": last,
"this_ns": received_at,
},
},
)
return False
self._last_ts_ns[kind] = received_at
frame = FcTelemetryFrame(
kind=kind,
payload=payload,
received_at=received_at,
signed=False, # iNav has no signing per RESTRICT-COMM-2
)
ring.push(frame)
self._bus.dispatch(frame)
return True
def _log_decode_error(self, handler_name: str, error_repr: str) -> None:
self._log.debug(
f"c8.inbound.decode_error: handler={handler_name} {error_repr}",
extra={
"kind": _DECODE_ERROR_KIND,
"kv": {"handler": handler_name, "error": error_repr},
},
)
@@ -0,0 +1,96 @@
"""Multi-subscriber telemetry fan-out (AZ-391 / E-C8).
The single decode thread per adapter pushes :class:`FcTelemetryFrame`
into a :class:`SubscriptionBus`; the bus fans it out synchronously to
every registered callback. Cancelling a subscription removes the
callback from the next dispatch tick.
Why synchronous fan-out and not per-subscriber threads:
- ``FcAdapter`` callers (C1 VIO, C5 StateEstimator) own their own
decode-side ring buffers; the bus's responsibility is dispatch only.
- The contract says callbacks fire ON the decoder thread (Invariant
8). Adding queue/thread per subscriber would (a) increase latency,
(b) hide cross-callback ordering bugs, (c) require shutdown-order
guarantees we don't actually need.
Slow subscribers MUST not block the decode loop beyond 100 µs per
the AZ-391 constraint. The bus does NOT enforce this — subscribers
are trusted to use a non-blocking enqueue + drain on their own
thread. (See `_inbound_mavlink._fire_callbacks` for the catch +
DEBUG log path that prevents a subscriber crash from killing the
decode loop.)
"""
from __future__ import annotations
import itertools
import threading
from typing import Final
from gps_denied_onboard._types.fc import FcTelemetryFrame, TelemetryCallback
from gps_denied_onboard.logging import get_logger
__all__ = ["SubscriptionBus", "SubscriptionHandle"]
_CALLBACK_ERROR_KIND: Final[str] = "c8.inbound.callback_error"
class SubscriptionHandle:
"""Concrete `Subscription` Protocol implementation."""
def __init__(self, bus: SubscriptionBus, sub_id: int) -> None:
self._bus = bus
self._sub_id = sub_id
def cancel(self) -> None:
self._bus._cancel(self._sub_id)
class SubscriptionBus:
"""Lock-guarded subscriber registry + fan-out dispatch."""
def __init__(self) -> None:
self._lock = threading.Lock()
self._subscribers: dict[int, TelemetryCallback] = {}
self._next_id = itertools.count(1)
self._log = get_logger("c8_fc_adapter.subscription_bus")
def subscribe(self, callback: TelemetryCallback) -> SubscriptionHandle:
with self._lock:
sub_id = next(self._next_id)
self._subscribers[sub_id] = callback
return SubscriptionHandle(self, sub_id)
def _cancel(self, sub_id: int) -> None:
with self._lock:
self._subscribers.pop(sub_id, None)
@property
def subscriber_count(self) -> int:
with self._lock:
return len(self._subscribers)
def dispatch(self, frame: FcTelemetryFrame) -> None:
"""Fan ``frame`` out to every currently-registered subscriber.
Subscriber exceptions are caught + DEBUG-logged so one bad
subscriber cannot kill the decode loop (Invariant 8). The
callback list is snapshot under the lock; callbacks fire OUTSIDE
the lock so a callback that calls back into ``subscribe`` /
``cancel`` does not deadlock.
"""
with self._lock:
callbacks = list(self._subscribers.items())
for sub_id, cb in callbacks:
try:
cb(frame)
except Exception as exc:
self._log.debug(
f"c8.inbound.callback_error: sub_id={sub_id} {exc!r}",
extra={
"kind": _CALLBACK_ERROR_KIND,
"kv": {"sub_id": sub_id, "error": repr(exc)},
},
)
@@ -0,0 +1,112 @@
"""Bounded per-kind telemetry rings (AZ-391 / E-C8).
Single decode thread pushes; arbitrary number of consumer threads
``peek`` (lock-free read of the latest sample) and ``snapshot`` (copy
of the entire ring under the lock). Drop-oldest on overflow — emits a
single INFO log at the FIRST overflow per ring instance (subsequent
overflows are counted but not logged to avoid drowning the log).
Why a custom ring and not :class:`collections.deque(maxlen=N)`:
- We need a single INFO log on the FIRST overflow event, plus a
monotonic dropped-counter that survives across overflows.
- ``deque`` push is GIL-fast but the dropped-counter would still need
a separate lock; consolidating both behind a single ring is simpler
and gives ``peek_latest`` for free as O(1).
The ring is thread-safe at the Python level (one lock guards both
``push`` and ``snapshot``). Decoder callbacks fire OUTSIDE the ring
lock per Invariant 8.
"""
from __future__ import annotations
import threading
from collections import deque
from collections.abc import Iterable
from typing import Final, Generic, TypeVar
from gps_denied_onboard.logging import get_logger
__all__ = ["TelemetryRing"]
T = TypeVar("T")
_OVERFLOW_KIND: Final[str] = "c8.inbound.ring_overflow"
class TelemetryRing(Generic[T]):
"""Bounded drop-oldest ring buffer for one telemetry kind."""
def __init__(self, capacity: int, *, kind_name: str) -> None:
if capacity <= 0:
raise ValueError(f"TelemetryRing capacity must be > 0; got {capacity}")
self._capacity = capacity
self._kind_name = kind_name
self._lock = threading.Lock()
self._items: deque[T] = deque(maxlen=capacity)
self._dropped_count = 0
self._overflow_logged = False
self._log = get_logger("c8_fc_adapter.telemetry_ring")
@property
def capacity(self) -> int:
return self._capacity
@property
def kind_name(self) -> str:
return self._kind_name
@property
def dropped_count(self) -> int:
with self._lock:
return self._dropped_count
def push(self, item: T) -> bool:
"""Push ``item`` (drop-oldest on overflow). Returns ``False`` on drop."""
with self._lock:
if len(self._items) >= self._capacity:
self._dropped_count += 1
should_log_first = not self._overflow_logged
self._items.append(item)
if should_log_first:
self._overflow_logged = True
else:
self._items.append(item)
should_log_first = False
if should_log_first:
self._log.info(
f"c8.inbound.ring_overflow: kind={self._kind_name} capacity={self._capacity}",
extra={
"kind": _OVERFLOW_KIND,
"kv": {
"ring_kind": self._kind_name,
"capacity": self._capacity,
},
},
)
return False
return True
def peek_latest(self) -> T | None:
"""Return the most-recently-pushed item, or None if empty."""
with self._lock:
if not self._items:
return None
return self._items[-1]
def snapshot(self) -> list[T]:
"""Return a shallow copy of the ring's contents (oldest -> newest)."""
with self._lock:
return list(self._items)
def __len__(self) -> int:
with self._lock:
return len(self._items)
def extend(self, items: Iterable[T]) -> None:
"""Push multiple items; per-item drop accounting still applies."""
for item in items:
self.push(item)