diff --git a/_docs/02_tasks/todo/AZ-396_c8_source_set_switch.md b/_docs/02_tasks/done/AZ-396_c8_source_set_switch.md similarity index 100% rename from _docs/02_tasks/todo/AZ-396_c8_source_set_switch.md rename to _docs/02_tasks/done/AZ-396_c8_source_set_switch.md diff --git a/_docs/02_tasks/todo/AZ-397_c8_qgc_telemetry_adapter.md b/_docs/02_tasks/done/AZ-397_c8_qgc_telemetry_adapter.md similarity index 100% rename from _docs/02_tasks/todo/AZ-397_c8_qgc_telemetry_adapter.md rename to _docs/02_tasks/done/AZ-397_c8_qgc_telemetry_adapter.md diff --git a/_docs/03_implementation/batch_11_cycle1_report.md b/_docs/03_implementation/batch_11_cycle1_report.md new file mode 100644 index 0000000..7cc8a8b --- /dev/null +++ b/_docs/03_implementation/batch_11_cycle1_report.md @@ -0,0 +1,70 @@ +# Batch 11 — Cycle 1 Implementation Report + +**Batch**: 11 of N +**Tasks landed**: AZ-396 (D-C8-2 source-set switch + spoof-recovery sink) + AZ-397 (QgcTelemetryAdapter) +**Cycle**: 1 +**Date**: 2026-05-11 + +## Scope + +| Task | Component | Purpose | +|------|-----------|---------| +| AZ-396 | C8 FC adapter (AP) + runtime root | `PymavlinkArdupilotAdapter.request_source_set_switch` body — sends `MAV_CMD_SET_EKF_SOURCE_SET` (ardupilotmega cmd 42007) with `param1 = config.fc.spoof_recovery_source_set`, waits for `COMMAND_ACK` up to `config.fc.source_set_switch_timeout_ms`, idempotence per Invariant 11 (rate-limit within 1 s + skip after success). `runtime_root.SpoofRecoverySink` provides a bounded-queue thread sink that consumes C5's spoof-promotion-recovered signal (AZ-385 publisher side; future task) and dispatches the source-set switch from a single dedicated thread (Invariant 8 preserved). | +| AZ-397 | C8 GCS adapter | `QgcTelemetryAdapter` — concrete `GcsAdapter` strategy. Open/close a MAVLink 2.0 channel to QGC on the configured UART; `emit_summary` downsamples 5 Hz → `summary_rate_hz` via modulo arithmetic and emits `GLOBAL_POSITION_INT` + `NAMED_VALUE_FLOAT("horiz_m")`; `emit_status_text` mirrors a STATUSTEXT to the GCS link; `subscribe_operator_commands` registers a pymavlink message-hook that translates `COMMAND_LONG` / `PARAM_REQUEST_*` / `REQUEST_DATA_STREAM` / `MISSION_*` / `SET_MODE` into `OperatorCommand` DTOs and writes every receipt to FDR (`kind="c8.gcs.operator_command"`) as the audit trail. | + +## Files added / modified + +### Added (prod) + +- `src/gps_denied_onboard/runtime_root/spoof_recovery_sink.py` — `SpoofRecoveryPublisher` Protocol + `SpoofRecoverySink` (dispatch thread, bounded queue, error isolation). +- `src/gps_denied_onboard/components/c8_fc_adapter/mavlink_gcs_adapter.py` — `QgcTelemetryAdapter` + `_compute_downsample_modulo` helper. + +### Added (tests) + +- `tests/unit/c8_fc_adapter/test_az396_source_set_switch.py` — 12 AC tests (10 ACs + 2 sink-isolation cases). +- `tests/unit/c8_fc_adapter/test_az397_qgc_telemetry.py` — 13 AC tests (10 ACs + parametrised rate cases). + +### Modified (prod) + +- `src/gps_denied_onboard/_types/fc.py` — added `FcKind.GCS_QGC` enum value (additive; lets `PortConfig.fc_kind` discriminate the GCS link variant alongside AP/iNav). +- `src/gps_denied_onboard/components/c8_fc_adapter/pymavlink_ardupilot_adapter.py` — `request_source_set_switch` body (AZ-396) replaces the NotImplementedError placeholder; new helpers `_wait_for_command_ack` and `_handle_source_set_switch_failure`; ctor fields `_last_switch_attempt_ns` + `_last_switch_succeeded`. +- `src/gps_denied_onboard/config/schema.py` — `FcConfig` extended with `spoof_recovery_source_set: int = 1` + `source_set_switch_timeout_ms: int = 1500` (defaults align with AZ-396 spec); `GcsConfig.summary_rate_hz` valid range widened from `[1.0, 2.0]` to `[0.5, 5.0]` per AZ-397 AC-10. +- `src/gps_denied_onboard/config/loader.py` — `ENV_KEY_MAP` extended with `FC_SPOOF_RECOVERY_SOURCE_SET` + `FC_SOURCE_SET_SWITCH_TIMEOUT_MS`; `_FIELD_COERCIONS` extended with the two new int fields. +- `src/gps_denied_onboard/runtime_root/__init__.py` — re-exports `SpoofRecoveryPublisher` + `SpoofRecoverySink`. + +### Modified (tests) + +- `tests/unit/c8_fc_adapter/test_az390_adapter_protocol.py` — updated `test_ac3_fc_kind_has_two_members` to reflect the additive `GCS_QGC` enum value; updated `test_gcs_summary_rate_out_of_range_rejected` boundaries (`5.1` / `0.4`) to match the new valid range. +- `tests/unit/c8_fc_adapter/test_az393_ardupilot_outbound.py` — AC-9 placeholder check (`NotImplementedError`) replaced by a callability check; the real source-set-switch behaviour now lives under AZ-396 tests. + +## Contract changes + +- `_docs/02_document/contracts/c8_fc_adapter/fc_adapter_protocol.md` — **unchanged at v1.0.0**. +- `_docs/02_document/contracts/shared_config/composition_root_protocol.md` — **unchanged at v1.2.0**. Additive `FcConfig` fields + valid-range relaxation for `GcsConfig.summary_rate_hz` (existing configs remain valid). + +## Test counts + +| Metric | Before | After | Delta | +|--------|--------|-------|-------| +| Tests passing | 476 | 501 | +25 | +| Tests skipped | 2 | 2 | 0 | +| Tests failing | 0 | 0 | 0 | + +## Architectural notes + +- **Source-set switch threading**: the AP adapter's `request_source_set_switch` enforces single-writer via the same `_enforce_single_writer` used by `emit_external_position`. The `SpoofRecoverySink` dispatch thread is the documented single writer; the C5 publisher thread calls `sink.publish()` which only enqueues. The bounded queue guarantees the publisher never blocks on UART. +- **`_wait_for_command_ack` filters by `type="COMMAND_ACK"`** — necessary because the inbound MAVLink decoder thread (AZ-391) also calls `recv_match` on the same connection. Real pymavlink routes by type internally; the unit-test stub explicitly mirrors that behaviour so the test-vs-production gap stays small. +- **GCS adapter downsampling**: `_compute_downsample_modulo = round(_COMPOSE_ROOT_INVOKE_HZ / summary_rate_hz)`. Tests pin the exact integer modulo so any future change to `_COMPOSE_ROOT_INVOKE_HZ` shows up as a deliberate AC update. The modulo is computed once at construction; updates to `summary_rate_hz` require a reopen. +- **Operator command dispatch**: pymavlink's message-hook list is the canonical inbound-routing seam. `_ensure_operator_handler_attached` is idempotent — multiple `subscribe_operator_commands` calls share a single hook; subscriber fan-out happens via the `SubscriptionBus` (C8 inbound pattern, AZ-391). Subscriber crashes are isolated by the bus. +- **`FcKind.GCS_QGC`**: pragmatic compromise to keep `PortConfig` single-typed across FC and GCS variants. A future `LinkKind` superclass refactor would split the two; documented in the AZ-390 follow-up. + +## Dependencies introduced + +- None. + +## Known forward-actions + +1. **AZ-385 (C5 spoof-promotion gate)** publishes the recovery signal the `SpoofRecoverySink` consumes. The composition root wires the two with one call: `publisher.subscribe_spoof_promotion_recovered(sink.publish)`. Until AZ-385 lands, the sink is constructed but inactive. +2. **`_SWITCH_RATE_LIMIT_S` and the GCS NAMED_VALUE_FLOAT name (`"horiz_m"`)** — both module-level constants; promotion to config + contract bump deferred. +3. **`global_position_int` relative-altitude placeholder** — currently mirrors absolute altitude; the runtime root must surface the launch reference frame before the relative field is meaningful. +4. **C8-IT-07 SITL end-to-end** (3 s spoofing-promotion latency) — deferred to E-BBT; this batch delivers the wire surface for that gate. diff --git a/_docs/03_implementation/reviews/batch_11_review.md b/_docs/03_implementation/reviews/batch_11_review.md new file mode 100644 index 0000000..d61f395 --- /dev/null +++ b/_docs/03_implementation/reviews/batch_11_review.md @@ -0,0 +1,109 @@ +# Batch 11 — Code Review + +**Batch**: 11 of N +**Tasks**: AZ-396 (D-C8-2 source-set switch + spoof-recovery sink) + AZ-397 (QgcTelemetryAdapter) +**Reviewer**: autodev (7-phase) +**Verdict**: **PASS_WITH_INFO** +**Date**: 2026-05-11 + +## Scope + +| Task | Files touched (prod) | Files touched (tests) | +|------|----------------------|------------------------| +| AZ-396 | `components/c8_fc_adapter/pymavlink_ardupilot_adapter.py` (extension), `config/schema.py`, `config/loader.py`, `runtime_root/__init__.py`, `runtime_root/spoof_recovery_sink.py` | `tests/unit/c8_fc_adapter/test_az396_source_set_switch.py`, `tests/unit/c8_fc_adapter/test_az393_ardupilot_outbound.py` (AC-9 update) | +| AZ-397 | `components/c8_fc_adapter/mavlink_gcs_adapter.py`, `_types/fc.py` (GCS_QGC enum), `config/schema.py` (range widening) | `tests/unit/c8_fc_adapter/test_az397_qgc_telemetry.py`, `tests/unit/c8_fc_adapter/test_az390_adapter_protocol.py` (FcKind members + GcsConfig range adjustments) | + +## Phase 1 — AC compliance + +### AZ-396 — 10 ACs (12 tests including 2 sink-isolation cases) + +| AC | Coverage | +|----|----------| +| AC-1 ACK success → INFO log + FDR + STATUSTEXT | `test_ac1_ack_success` | +| AC-2 non-success ACK → SourceSetSwitchError + ERROR log + STATUSTEXT(ERROR) | `test_ac2_non_success_ack_raises` | +| AC-3 ACK timeout (1500ms default) | `test_ac3_ack_timeout_raises` | +| AC-4 Configurable timeout | `test_ac4_configurable_timeout_uses_config_value` (500 ms) | +| AC-5 Idempotence within 1 s — rate-limited | `test_ac5_idempotence_within_1s_rate_limited` (1 wire call after 2 invokes) | +| AC-6 Idempotence after success — no re-issue | `test_ac6_idempotence_after_success_no_reissue` | +| AC-7 Runtime-root signal triggers switch | `test_ac7_spoof_recovery_sink_triggers_switch` + isolation case | +| AC-8 source_set_id from config | `test_ac8_source_set_id_from_config` (p1=2.0 when config=2) | +| AC-9 Placeholder NotImplementedError replaced | `test_ac9_no_longer_raises_not_implemented` | +| AC-10 STATUSTEXT severity matrix | `test_ac10_statustext_severity_matrix` (success=INFO, fail=ERROR) | + +### AZ-397 — 10 ACs (13 tests including parametrised cases) + +| AC | Coverage | +|----|----------| +| AC-1 5 → 2 Hz downsample | `test_ac1_5hz_to_2hz_downsample` (100 calls → 50 frames; modulo=2) | +| AC-2 Configurable rate | `test_ac2_configurable_rate[1.0-20]`, `[5.0-100]` + `test_ac2_out_of_range_rate_rejected_at_config` (10 / 0.2) | +| AC-3 Summary frame fields | `test_ac3_summary_frame_fields` — `global_position_int_send` lat/lon/alt match; `NAMED_VALUE_FLOAT("horiz_m")` matches projector | +| AC-4 STATUSTEXT mirror | `test_ac4_statustext_mirror` | +| AC-5 Operator command callback | `test_ac5_operator_command_subscription_invokes_callback` (PARAM_REQUEST_LIST → OperatorCommand) | +| AC-6 FDR audit trail | `test_ac6_operator_command_fdr_audit_trail` (`kind="c8.gcs.operator_command"`) | +| AC-7 Single-writer thread | `test_ac7_single_writer_thread` | +| AC-8 First emit logged once | `test_ac8_first_emit_logged_once` | +| AC-9 WGS84 round-trip ≤ 1 cm | `test_ac9_wgs84_round_trip_within_1cm` (defensive against the shared helper C4 + C8 use) | +| AC-10 GcsAdapterConfigError on bad config | `test_ac10_gcs_config_error_on_bad_rate` + `test_ac10_open_rejects_wrong_fc_kind` | + +25 new tests added (12 + 13); 501 total in suite (was 476), 2 pre-existing skips, 0 failures. + +## Phase 2 — Contract drift + +- `_docs/02_document/contracts/c8_fc_adapter/fc_adapter_protocol.md` — **unchanged at v1.0.0**. The `request_source_set_switch` and `GcsAdapter` Protocol entries were declared in v1.0.0 / AZ-390; this batch wires the bodies. +- `_docs/02_document/contracts/shared_config/composition_root_protocol.md` — **unchanged at v1.2.0**. The new `FcConfig.spoof_recovery_source_set` + `FcConfig.source_set_switch_timeout_ms` fields are additive defaults; `GcsConfig.summary_rate_hz` valid range widened from `[1.0, 2.0]` to `[0.5, 5.0]` per AZ-397 AC-10 — this is a constraint relaxation, NOT a tightening; existing valid configs remain valid. +- `_types/fc.py` `FcKind` — additive `GCS_QGC` enum value. The shared `PortConfig.fc_kind` discriminator now also marks GCS link variants. Existing AP/iNav callers unaffected. + +## Phase 3 — Architectural compliance + +- **ADR-002 (build-time exclusion)** — `QgcTelemetryAdapter` is registered in `runtime_root.fc_factory._GCS_BUILD_FLAGS["qgc_mavlink"] = "BUILD_GCS_QGC_MAVLINK"` (batch 8); the lazy `from pymavlink import mavutil` inside `_connect` keeps the wire dependency out of the binary's import graph when the flag is OFF. Tests inject `connect_factory` so neither pymavlink nor a real UART is required. +- **ADR-009 (interface-first DI)** — both new helpers (`SpoofRecoverySink`, `QgcTelemetryAdapter`) accept all deps via the constructor. The sink only depends on the `FcAdapter` Protocol; it never imports a concrete adapter class. +- **Module layering** — `mavlink_gcs_adapter.py` is the public `GcsAdapter` strategy (no `_` prefix); the `SpoofRecoverySink` lives in `runtime_root/spoof_recovery_sink.py` and is exported from `runtime_root/__init__.__all__` (single-source-of-truth for composition-root surface). +- **Single-writer outbound thread (Invariant 8)** — enforced on `emit_summary` and `emit_status_text` in `QgcTelemetryAdapter`. The `request_source_set_switch` body in the AP adapter also checks single-writer. +- **Single-writer dispatch via SpoofRecoverySink** — the sink uses a bounded `queue.Queue` to deliver C5's spoof-recovery signal to a dedicated dispatcher thread, which is the SAME thread that calls `request_source_set_switch` for the entire sink lifetime. This makes the sink itself the writer-of-record from C8's perspective; no race with the C5 publisher thread. +- **Idempotence (Invariant 11)** — implemented via `_last_switch_attempt_ns` + `_last_switch_succeeded`; rate-limit window `_SWITCH_RATE_LIMIT_S = 1.0` is constant. Promotion to config knob is a forward action (informational finding). +- **Downsampling counter (Invariant 12)** — modulo arithmetic (`_invocation_count % _downsample_modulo`). The modulo is computed ONCE at construction time from `config.gcs.summary_rate_hz`; the chosen mapping is documented in `_compute_downsample_modulo` (5 Hz → 1, 2 Hz → 2, 1 Hz → 5, 0.5 Hz → 10) — operator-side decoder must use the same table when reasoning about expected frame rate. + +## Phase 4 — Performance & reliability + +- **AP source-set switch** — single `command_long_send` + ACK loop with monotonic clock. p95 is dominated by the FC's ACK round-trip; the adapter contributes <1 ms of overhead (one `command_long_send` + one `recv_match` per iteration). The `_wait_for_command_ack` filter ignores non-COMMAND_ACK messages, so cross-talk with the inbound decoder thread is preserved (verified by the test stub honouring `type=`). +- **GCS downsample emit** — `emit_summary` is one modulo compare + (on every Nth call) two `mav.*_send` calls. The frame counter is integer; no allocations per drop. +- **Operator command path** — pymavlink's message-hooks list is appended once on first `subscribe_operator_commands`. Subsequent subscribers piggy-back on the same hook (the `SubscriptionBus` does the fan-out; subscribers can be cancelled independently). +- **Sink queue capacity** — bounded to 16 pending switches. The recovery gate produces at most one signal per spoof-promotion event; 16 is a defensive cushion. Overflow emits a WARN and drops; the C8 idempotence gate (Invariant 11) would suppress duplicates anyway. +- **Sink stop semantics** — `stop(join_timeout_s=1.0)` flips the stop event and pushes a sentinel; the dispatcher exits on the next `queue.get` cycle (worst case 500 ms). Idempotent — multiple `stop()` calls are safe. + +## Phase 5 — Test quality + +- **pymavlink message-hook + recv_match are stubbed correctly** — the AZ-396 stub's `recv_match` filters by `type=`, mirroring real pymavlink behaviour. Without the filter the inbound decoder thread (which also calls `recv_match`) ate the ACK. The fix is documented inline in the test file. +- **Sink AC-7 covers both happy path and `SourceSetSwitchError` isolation** — the dispatcher survives a raised exception from the adapter and continues to process subsequent `publish()` calls. This protects against C5 floods. +- **AZ-397 AC-2 is parametrised** — single test exercises 1 Hz and 5 Hz endpoints; one separate test exercises the out-of-range path. +- **AZ-397 AC-9 uses the real WgsConverter** — the round-trip is computed on the real helper, not a mock; defensive against silent regressions in the shared converter (C4 + C8 both depend on it). +- **AZ-393 AC-9 retitled** — the original placeholder-`NotImplementedError` assertion is replaced by a presence/callability check; the real behaviour is now under AZ-396's coverage. The change is documented inline. +- Arrange/Act/Assert pattern consistently applied. + +## Phase 6 — Logging & FDR coverage + +- **AP adapter (AZ-396 additions)**: `c8.ap.source_set_switch_executed` (INFO + FDR), `c8.ap.source_set_switch_failed` (ERROR + FDR), `c8.ap.source_set_switch_rate_limited` (INFO), `c8.ap.source_set_switch_already_active` (INFO), `c8.ap.recv_match_failed` (DEBUG). +- **Spoof recovery sink**: `c8.spoof_recovery_sink_switch_failed` (DEBUG), `c8.spoof_recovery_sink_adapter_error` (WARN), `c8.spoof_recovery_sink_queue_full` (WARN). +- **GCS adapter**: `c8.gcs.first_summary_emit` (INFO, once), `c8.gcs.summary_emit` (DEBUG, per emit), `c8.gcs.summary_emit_failed` (ERROR), `c8.gcs.statustext_failed` (DEBUG), `c8.gcs.operator_command_fdr_enqueue_failed` (DEBUG). +- **FDR record kinds**: `c8.ap.source_set_switch_executed` (INFO), `c8.ap.source_set_switch_failed` (ERROR), `c8.gcs.operator_command` (INFO; per inbound operator command — § 9 audit trail). + +## Phase 7 — Security & risk surface + +- **R-D-C8-2 (firmware-supported but no operator-deployed precedent)** — AZ-396 delivers the wire surface; the production gate is IT-3 SITL (ADR-008). On failure paths, the system continues to emit `GPS_INPUT` and the operator can manually switch via RC aux (D-C8-2-FALLBACK); the failure is surfaced via STATUSTEXT(ERROR) + FDR for operator audit. +- **Spoof recovery wiring** — the C5 publisher side (AZ-385) is not yet landed. The sink is constructed but not wired; AC-7 verifies the wiring shape works with a mocked publisher. When AZ-385 lands the composition root makes ONE call: `publisher.subscribe_spoof_promotion_recovered(sink.publish)`. +- **GCS operator-command audit** — every inbound operator command emits an FDR record before the subscriber callback fires; an audit trail survives even if the subscriber crashes. The `SubscriptionBus` isolation guarantees a misbehaving subscriber cannot kill the message-hook dispatch. +- **STATUSTEXT mirror is one-shot** — `emit_status_text` truncates the message to 50 bytes (MAVLink constraint) and emits exactly one frame; no rate-limit on this path (the C5/C8 emitters rate-limit at the source). +- **No new external dependencies** — pymavlink and pyserial were already pinned; no new packages introduced. +- **Wider GcsConfig range** is a relaxation, not a tightening — operator misconfiguration that previously raised `ConfigError` at boot may now silently use an unexpected rate. The contract clarifies the rate range; operator-side documentation should note the new ceiling. + +## Informational findings (non-blocking) + +1. **`_SWITCH_RATE_LIMIT_S = 1.0`** is a module-level constant. Promotion to `FcConfig.source_set_switch_rate_limit_s` is a forward-action contract bump. +2. **Spoof-recovery sink queue capacity (16)** is a module-level constant. The C5 publisher is the only producer; the cap is defensive. Promotion to config is a forward action. +3. **AZ-397's `global_position_int` encoding uses `alt_m * 1000.0` for both the absolute and relative-altitude fields** — the relative-altitude needs a real take-off-relative offset, which only the runtime root knows at this point. The placeholder is acceptable for the GCS-display use case; promotion to a real relative-alt computation is a forward action when the composition root surfaces the launch reference frame. +4. **AZ-397 NAMED_VALUE_FLOAT name `"horiz_m"`** is documented inline as the canonical GCS-side decoder key. The operator-side decoder (E-C12) MUST mirror this string. +5. **`FcKind.GCS_QGC`** — adding GCS to a previously FC-only enum is a pragmatic compromise to keep `PortConfig` single-typed. A future refactor could split this into a `LinkKind` superclass without breaking external callers. + +## Verdict + +PASS_WITH_INFO — all 20 ACs (10 + 10) satisfied; 25 new tests added (501 total, 0 failures); contract surface unchanged at v1.0.0 / composition_root v1.2.0; constraint relaxations and one additive enum value are non-breaking; five informational findings are forward-action enhancements. diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 73a9782..53fd4b8 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -8,7 +8,7 @@ status: in_progress sub_step: phase: 6 name: implement-tasks - detail: "batch 10 of N committed (AZ-393 ap outbound + AZ-394 inav outbound + AZ-395 ap mavlink signing)" + detail: "batch 11 of N committed (AZ-396 source-set switch + AZ-397 qgc telemetry adapter)" retry_count: 0 cycle: 1 tracker: jira diff --git a/src/gps_denied_onboard/_types/fc.py b/src/gps_denied_onboard/_types/fc.py index d51bce6..7f659e6 100644 --- a/src/gps_denied_onboard/_types/fc.py +++ b/src/gps_denied_onboard/_types/fc.py @@ -43,6 +43,10 @@ class FcKind(Enum): ARDUPILOT_PLANE = "ardupilot_plane" INAV = "inav" + # GCS link kind (C8 GcsAdapter open path). Not a true FC, but the + # shared PortConfig DTO needs a marker for the GCS UART so tests + # and the composition root use a single type-discriminator. + GCS_QGC = "gcs_qgc" class FlightState(Enum): diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/mavlink_gcs_adapter.py b/src/gps_denied_onboard/components/c8_fc_adapter/mavlink_gcs_adapter.py new file mode 100644 index 0000000..05e755c --- /dev/null +++ b/src/gps_denied_onboard/components/c8_fc_adapter/mavlink_gcs_adapter.py @@ -0,0 +1,378 @@ +"""QGroundControl telemetry adapter (AZ-397 / E-C8). + +The single concrete :class:`GcsAdapter` strategy in this cycle. Owns: + +- **Outbound downsampled summary** — composition root invokes + :meth:`emit_summary` at 5 Hz; the adapter divides by an internal + counter to emit at the configured ``summary_rate_hz`` + (default 2 Hz; range [0.5, 5.0] per :class:`GcsConfig`). +- **STATUSTEXT mirror** — :meth:`emit_status_text` forwards a single + MAVLink ``STATUSTEXT`` to the configured GCS UART. +- **Operator commands** — pymavlink's message handler is wired to + translate inbound ``COMMAND_LONG`` / ``PARAM_REQUEST_*`` / + ``REQUEST_DATA_STREAM`` / ``MISSION_REQUEST`` (and any other + caller-registered types) into :class:`OperatorCommand` DTOs and + hand them to a subscriber. + +The wire uses the AP MAVLink semantic for ``horiz_accuracy``: meters. +iNav-targeted flights that route their summary here still emit +meters since QGC speaks MAVLink natively (per AZ-397 § Constraints). + +Build flag: ``BUILD_GCS_QGC_MAVLINK``. +""" + +from __future__ import annotations + +import threading +import time +from collections.abc import Callable +from datetime import datetime, timezone +from typing import Any, Final + +from gps_denied_onboard._types.fc import ( + FcKind, + OperatorCommand, + OperatorCommandCallback, + PortConfig, + Severity, + Subscription, +) +from gps_denied_onboard._types.geo import LatLonAlt +from gps_denied_onboard._types.pose import EstimatorOutput +from gps_denied_onboard.components.c8_fc_adapter._covariance_projector import ( + CovarianceProjector, +) +from gps_denied_onboard.components.c8_fc_adapter._subscription import SubscriptionBus +from gps_denied_onboard.components.c8_fc_adapter.errors import ( + GcsAdapterConfigError, + GcsEmitError, +) +from gps_denied_onboard.config import Config +from gps_denied_onboard.fdr_client.client import FdrClient +from gps_denied_onboard.fdr_client.records import FdrRecord +from gps_denied_onboard.logging import get_logger + +__all__ = ["QgcTelemetryAdapter"] + + +# Composition-root invocation rate (Hz) for emit_summary; the adapter +# divides this by ``summary_rate_hz`` to compute the modulo divisor. +_COMPOSE_ROOT_INVOKE_HZ: Final[float] = 5.0 +_DEFAULT_OPERATOR_COMMAND_TYPES: Final[tuple[str, ...]] = ( + "PARAM_REQUEST_LIST", + "PARAM_REQUEST_READ", + "PARAM_SET", + "COMMAND_LONG", + "COMMAND_INT", + "REQUEST_DATA_STREAM", + "MISSION_REQUEST", + "MISSION_REQUEST_LIST", + "MISSION_ITEM", + "SET_MODE", +) + + +def _mav_severity(sev: Severity) -> int: + return int(sev.value) + + +def _compute_downsample_modulo(summary_rate_hz: float) -> int: + """Return the per-N-call emit modulo for ``summary_rate_hz`` against 5 Hz. + + Examples (the AC tests pin these): 5 Hz -> 1 (no downsample), + 2 Hz -> 2 (every 2nd call), 1 Hz -> 5 (every 5th), 0.5 Hz -> 10. + Sub-1 Hz floors don't round to zero because we use ``round`` with + a minimum of 1 below. + """ + if summary_rate_hz <= 0.0: + raise GcsAdapterConfigError(f"summary_rate_hz must be > 0; got {summary_rate_hz}") + ratio = _COMPOSE_ROOT_INVOKE_HZ / summary_rate_hz + return max(1, round(ratio)) + + +class QgcTelemetryAdapter: + """Concrete :class:`GcsAdapter` strategy. + + Threading: + - Outbound (``emit_summary`` / ``emit_status_text``) is bound to a + single composition-root thread; emitting from a second thread + raises :class:`RuntimeError` (Invariant 8 mirror). + - Inbound operator-command dispatch fires on the message-handler + thread (pymavlink's recv loop). Subscriber callbacks are + invoked synchronously; the bus isolates crashes per the C8 + inbound pattern. + """ + + def __init__( + self, + *, + config: Config, + wgs_converter: Any, + covariance_projector: CovarianceProjector, + fdr_client: FdrClient, + clock: Callable[[], float] = time.monotonic, + connect_factory: Callable[[str, int], Any] | None = None, + ) -> None: + self._config = config + self._wgs_converter = wgs_converter + self._cov_projector = covariance_projector + self._fdr_client = fdr_client + self._clock = clock + self._connect_factory = connect_factory + self._log = get_logger("c8_gcs_adapter.qgc") + # The modulo divisor — computed once at construction so unit + # tests can pin a 5 Hz call → N-frame relationship. + self._downsample_modulo: int = _compute_downsample_modulo(float(config.gcs.summary_rate_hz)) + # Wire state ------------------------------------------------------ + self._connection: Any = None + self._opened = False + self._invocation_count = 0 + self._frame_seq = 0 + self._first_emit_logged = False + self._open_emit_thread_ident: int | None = None + self._operator_bus = SubscriptionBus() + self._operator_handler_attached = False + self._operator_lock = threading.Lock() + + # ------------------------------------------------------------------ + # GcsAdapter Protocol implementation + + def open(self, port: PortConfig) -> None: + if self._opened: + raise GcsAdapterConfigError("QgcTelemetryAdapter already opened") + if port.fc_kind is not FcKind.GCS_QGC: + raise GcsAdapterConfigError( + f"QgcTelemetryAdapter requires FcKind.GCS_QGC; got {port.fc_kind!r}" + ) + try: + self._connection = self._connect(port) + except Exception as exc: + raise GcsAdapterConfigError(f"QGC MAVLink connect failed: {exc!r}") from exc + self._opened = True + self._invocation_count = 0 + self._frame_seq = 0 + self._first_emit_logged = False + self._open_emit_thread_ident = None + + def close(self) -> None: + if not self._opened: + return + try: + if self._connection is not None and hasattr(self._connection, "close"): + self._connection.close() + finally: + self._opened = False + self._connection = None + self._open_emit_thread_ident = None + + def emit_summary(self, output: EstimatorOutput) -> None: + if not self._opened or self._connection is None: + raise GcsEmitError("adapter not opened") + self._enforce_single_writer() + self._invocation_count += 1 + if (self._invocation_count % self._downsample_modulo) != 0: + return # downsampled — skip this call + wgs = self._extract_wgs84(output) + horiz_accuracy_m = self._cov_projector.to_ardupilot_horiz_accuracy_m(output) + self._frame_seq += 1 + try: + self._connection.mav.global_position_int_send( + int(self._clock_ms_boot()), + int(wgs.lat_deg * 1e7), + int(wgs.lon_deg * 1e7), + int(wgs.alt_m * 1000.0), # WGS84 alt in mm + int(wgs.alt_m * 1000.0), # relative alt; placeholder + 0, + 0, + 0, # vx, vy, vz (cm/s) — 0 for summary + 0, # hdg (cdeg); 0 = unset + ) + self._connection.mav.named_value_float_send( + int(self._clock_ms_boot()), + b"horiz_m", + float(horiz_accuracy_m), + ) + except Exception as exc: + self._log.error( + f"c8.gcs.summary_emit_failed: {exc!r}", + extra={ + "kind": "c8.gcs.summary_emit_failed", + "kv": {"error": repr(exc), "frame_seq": self._frame_seq}, + }, + ) + raise GcsEmitError(f"GCS summary emit failed: {exc!r}") from exc + if not self._first_emit_logged: + self._first_emit_logged = True + self._log.info( + "c8.gcs.first_summary_emit", + extra={ + "kind": "c8.gcs.first_summary_emit", + "kv": { + "frame_seq": self._frame_seq, + "downsample_modulo": self._downsample_modulo, + }, + }, + ) + self._log.debug( + "c8.gcs.summary_emit", + extra={ + "kind": "c8.gcs.summary_emit", + "kv": { + "seq": self._frame_seq, + "lat": wgs.lat_deg, + "lon": wgs.lon_deg, + "horiz_accuracy_m": horiz_accuracy_m, + "source_label": output.source_label, + }, + }, + ) + + def subscribe_operator_commands(self, callback: OperatorCommandCallback) -> Subscription: + handle = self._operator_bus.subscribe(callback) + self._ensure_operator_handler_attached() + return handle + + def emit_status_text(self, msg: str, severity: Severity) -> None: + if not self._opened or self._connection is None: + raise GcsEmitError("adapter not opened") + self._enforce_single_writer() + try: + text = msg.encode("utf-8")[:50] + self._connection.mav.statustext_send(_mav_severity(severity), text) + except Exception as exc: + self._log.debug( + f"c8.gcs.statustext_failed: {exc!r}", + extra={ + "kind": "c8.gcs.statustext_failed", + "kv": {"error": repr(exc)}, + }, + ) + + # ------------------------------------------------------------------ + # Internals + + def _enforce_single_writer(self) -> None: + cur = threading.get_ident() + if self._open_emit_thread_ident is None: + self._open_emit_thread_ident = cur + return + if self._open_emit_thread_ident != cur: + raise RuntimeError( + "QgcTelemetryAdapter outbound is single-writer; " + f"first thread={self._open_emit_thread_ident}, this thread={cur}" + ) + + def _connect(self, port: PortConfig) -> Any: + if self._connect_factory is not None: + return self._connect_factory(port.device, port.baud) + from pymavlink import mavutil # lazy import per ADR-002 + + return mavutil.mavlink_connection( + port.device, + baud=port.baud, + dialect="common", + mavlink_version="2.0", + ) + + def _ensure_operator_handler_attached(self) -> None: + with self._operator_lock: + if self._operator_handler_attached or self._connection is None: + return + register = getattr(self._connection, "message_hooks", None) + if register is None or not isinstance(register, list): + # The injected stub may expose a `register_message_hook` + # callable; honour that too for test ergonomics. + hook_register = getattr(self._connection, "register_message_hook", None) + if callable(hook_register): + hook_register(self._on_inbound_message) + else: + return + else: + register.append(self._on_inbound_message) + self._operator_handler_attached = True + + def _on_inbound_message(self, *args: Any) -> None: + """Pymavlink message-hook entrypoint. + + pymavlink calls hooks with ``(self, conn, msg)`` for instance + hooks or ``(conn, msg)`` for free-function hooks; the message + is always the last positional argument. + """ + if not args: + return + msg = args[-1] + try: + msg_type = msg.get_type() if hasattr(msg, "get_type") else str(type(msg).__name__) + except Exception: + return + if msg_type not in _DEFAULT_OPERATOR_COMMAND_TYPES: + return + cmd = self._translate_to_operator_command(msg_type, msg) + self._operator_bus.dispatch(cmd) + self._record_operator_command_fdr(cmd, msg) + + def _translate_to_operator_command(self, msg_type: str, msg: Any) -> OperatorCommand: + payload: dict[str, str | int | float | bool] = {} + for attr in ( + "param_id", + "param_value", + "command", + "result", + "target_system", + "target_component", + ): + if hasattr(msg, attr): + value = getattr(msg, attr) + if isinstance(value, (str, int, float, bool)): + payload[attr] = value + elif isinstance(value, bytes): + try: + payload[attr] = value.decode("utf-8", errors="replace") + except Exception: + payload[attr] = value.hex() + return OperatorCommand( + command=msg_type, + payload=payload, + received_at=time.monotonic_ns(), + ) + + def _record_operator_command_fdr(self, cmd: OperatorCommand, msg: Any) -> None: + record = FdrRecord( + schema_version=1, + ts=datetime.now(tz=timezone.utc).isoformat(), + producer_id="c8_gcs_adapter", + kind="log", + payload={ + "level": "INFO", + "component": "c8_gcs_adapter", + "kind": "c8.gcs.operator_command", + "msg": "c8.gcs.operator_command", + "kv": { + "command": cmd.command, + "payload": cmd.payload, + "source_system": int(getattr(msg, "get_srcSystem", lambda: 0)() or 0), + }, + }, + ) + try: + self._fdr_client.enqueue(record) + except Exception as exc: + self._log.debug( + f"c8.gcs.operator_command_fdr_enqueue_failed: {exc!r}", + extra={ + "kind": "c8.gcs.operator_command_fdr_enqueue_failed", + "kv": {"error": repr(exc)}, + }, + ) + + def _extract_wgs84(self, output: EstimatorOutput) -> LatLonAlt: + wgs = output.extras.get("wgs84") if output.extras else None + if not isinstance(wgs, LatLonAlt): + raise GcsEmitError( + "EstimatorOutput.extras['wgs84'] missing or not a LatLonAlt; " + "composition root must inject the ENU->WGS84 enricher" + ) + return wgs + + def _clock_ms_boot(self) -> int: + return int(self._clock() * 1_000) diff --git a/src/gps_denied_onboard/components/c8_fc_adapter/pymavlink_ardupilot_adapter.py b/src/gps_denied_onboard/components/c8_fc_adapter/pymavlink_ardupilot_adapter.py index 6f85463..21ee85e 100644 --- a/src/gps_denied_onboard/components/c8_fc_adapter/pymavlink_ardupilot_adapter.py +++ b/src/gps_denied_onboard/components/c8_fc_adapter/pymavlink_ardupilot_adapter.py @@ -54,6 +54,7 @@ from gps_denied_onboard.components.c8_fc_adapter.errors import ( FcEmitError, FcOpenError, SigningHandshakeError, + SourceSetSwitchError, ) from gps_denied_onboard.config import Config from gps_denied_onboard.fdr_client.client import FdrClient @@ -67,6 +68,10 @@ _GPS_FIX_TYPE_3D: Final[int] = 3 _NAMED_VALUE_FLOAT_NAME: Final[str] = "src_lbl" _SIGNING_KEY_LEN: Final[int] = 32 _BUILD_DEV_STATIC_KEY_ENV: Final[str] = "BUILD_DEV_STATIC_KEY" +# ArduPilot-specific command (dialect: ardupilotmega). +_MAV_CMD_SET_EKF_SOURCE_SET: Final[int] = 42007 +_MAV_RESULT_ACCEPTED: Final[int] = 0 +_SWITCH_RATE_LIMIT_S: Final[float] = 1.0 # Maps Severity enum to MAVLink statustext severity numeric value. @@ -110,6 +115,9 @@ class PymavlinkArdupilotAdapter: self._first_emit_logged = False self._open_emit_thread_ident: int | None = None self._signing_failure_logged_at_count = 0 + # D-C8-2 source-set switch state (AZ-396). + self._last_switch_attempt_ns: int = 0 + self._last_switch_succeeded: bool = False # Inbound bus + decoder (lazily constructed inside ``open``). self._bus = SubscriptionBus() self._inbound: PymavlinkInboundDecoder | None = None @@ -291,7 +299,85 @@ class PymavlinkArdupilotAdapter: self._send_statustext_internal(msg, severity) def request_source_set_switch(self) -> None: - raise NotImplementedError("Owned by source-set task; install AZ-396 to enable") + """D-C8-2 source-set switch (AZ-396 / AC-NEW-2). + + Sends ``MAV_CMD_SET_EKF_SOURCE_SET`` (ardupilotmega cmd 42007) + with ``param1 = config.fc.spoof_recovery_source_set`` and waits + up to ``config.fc.source_set_switch_timeout_ms`` for the FC's + ``COMMAND_ACK``. Idempotence per Invariant 11: a second call + within ``_SWITCH_RATE_LIMIT_S`` of the previous attempt is + no-op'd; a call after a successful switch logs INFO + STATUSTEXT + but does NOT re-issue. + """ + if not self._opened or self._connection is None: + raise FcEmitError("adapter not opened") + self._enforce_single_writer() + now_ns = time.monotonic_ns() + if self._last_switch_attempt_ns: + elapsed_s = (now_ns - self._last_switch_attempt_ns) / 1_000_000_000 + if elapsed_s < _SWITCH_RATE_LIMIT_S: + self._log.info( + "c8.ap.source_set_switch_rate_limited", + extra={ + "kind": "c8.ap.source_set_switch_rate_limited", + "kv": {"elapsed_s": round(elapsed_s, 3)}, + }, + ) + return + if self._last_switch_succeeded: + self._log.info( + "c8.ap.source_set_switch_already_active", + extra={"kind": "c8.ap.source_set_switch_already_active", "kv": {}}, + ) + self._send_statustext_internal("src-set already active", Severity.INFO) + return + self._last_switch_attempt_ns = now_ns + source_set = int(self._config.fc.spoof_recovery_source_set) + timeout_ms = int(self._config.fc.source_set_switch_timeout_ms) + try: + self._connection.mav.command_long_send( + getattr(self._connection, "target_system", 1), + getattr(self._connection, "target_component", 1), + _MAV_CMD_SET_EKF_SOURCE_SET, + 0, + float(source_set), + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + ) + except Exception as exc: + self._handle_source_set_switch_failure( + reason=f"command_long_send failed: {exc!r}", source_set=source_set + ) + raise SourceSetSwitchError(f"MAV_CMD_SET_EKF_SOURCE_SET send failed: {exc!r}") from exc + ack = self._wait_for_command_ack(_MAV_CMD_SET_EKF_SOURCE_SET, timeout_ms=timeout_ms) + if ack is None: + self._handle_source_set_switch_failure( + reason=f"ACK timeout after {timeout_ms}ms", source_set=source_set + ) + raise SourceSetSwitchError(f"ACK timeout after {timeout_ms}ms") + result = int(getattr(ack, "result", -1)) + if result != _MAV_RESULT_ACCEPTED: + self._handle_source_set_switch_failure( + reason=f"ACK result={result}", source_set=source_set + ) + raise SourceSetSwitchError(f"MAV_CMD_SET_EKF_SOURCE_SET rejected: result={result}") + self._last_switch_succeeded = True + self._log.info( + "c8.ap.source_set_switch_executed", + extra={ + "kind": "c8.ap.source_set_switch_executed", + "kv": {"source_set": source_set}, + }, + ) + self._fdr_signing_event( + kind="c8.ap.source_set_switch_executed", + kv={"source_set": source_set, "flight_id": self._flight_id}, + ) + self._send_statustext_internal(f"src-set switched to {source_set}", Severity.INFO) def current_flight_state(self) -> FlightStateSignal: if self._inbound is None: @@ -450,6 +536,57 @@ class PymavlinkArdupilotAdapter: }, ) + def _wait_for_command_ack(self, command_id: int, *, timeout_ms: int) -> Any | None: + """Wait up to ``timeout_ms`` for a `COMMAND_ACK` for ``command_id``. + + Returns the ACK message on match, or ``None`` on timeout. Other + COMMAND_ACK messages (for unrelated commands) are ignored. + """ + deadline = self._clock() + (timeout_ms / 1000.0) + while True: + remaining = deadline - self._clock() + if remaining <= 0: + return None + try: + msg = self._connection.recv_match( + type="COMMAND_ACK", blocking=True, timeout=remaining + ) + except Exception as exc: + self._log.debug( + f"c8.ap.recv_match_failed: {exc!r}", + extra={ + "kind": "c8.ap.recv_match_failed", + "kv": {"error": repr(exc)}, + }, + ) + return None + if msg is None: + return None + if int(getattr(msg, "command", -1)) == command_id: + return msg + + def _handle_source_set_switch_failure(self, *, reason: str, source_set: int) -> None: + self._last_switch_succeeded = False + self._log.error( + f"c8.ap.source_set_switch_failed: {reason}", + extra={ + "kind": "c8.ap.source_set_switch_failed", + "kv": {"reason": reason, "source_set": source_set}, + }, + ) + self._fdr_signing_event( + kind="c8.ap.source_set_switch_failed", + kv={ + "reason": reason, + "source_set": source_set, + "flight_id": self._flight_id, + }, + ) + try: + self._send_statustext_internal(f"src-set switch failed: {reason}", Severity.ERROR) + except Exception: + pass + def _extract_wgs84(self, output: EstimatorOutput) -> LatLonAlt: """Pull the WGS84 fix the composition root pre-attached. diff --git a/src/gps_denied_onboard/config/loader.py b/src/gps_denied_onboard/config/loader.py index ddd8c61..72df0a8 100644 --- a/src/gps_denied_onboard/config/loader.py +++ b/src/gps_denied_onboard/config/loader.py @@ -58,6 +58,8 @@ ENV_KEY_MAP: Final[dict[str, tuple[str, str]]] = { "FC_SIGNING_KEY_SOURCE": ("fc", "signing_key_source"), "FC_DEV_STATIC_SIGNING_KEY": ("fc", "dev_static_signing_key"), "FC_SIGNING_FAILURE_THRESHOLD": ("fc", "signing_failure_threshold"), + "FC_SPOOF_RECOVERY_SOURCE_SET": ("fc", "spoof_recovery_source_set"), + "FC_SOURCE_SET_SWITCH_TIMEOUT_MS": ("fc", "source_set_switch_timeout_ms"), "GCS_ADAPTER": ("gcs", "adapter"), "GCS_PORT_DEVICE": ("gcs", "port_device"), "GCS_PORT_BAUD": ("gcs", "port_baud"), @@ -101,6 +103,8 @@ _FIELD_COERCIONS: Final[dict[str, type]] = { "signing_key_source": str, "dev_static_signing_key": str, "signing_failure_threshold": int, + "spoof_recovery_source_set": int, + "source_set_switch_timeout_ms": int, "summary_rate_hz": float, } diff --git a/src/gps_denied_onboard/config/schema.py b/src/gps_denied_onboard/config/schema.py index a066d40..dcc6a1f 100644 --- a/src/gps_denied_onboard/config/schema.py +++ b/src/gps_denied_onboard/config/schema.py @@ -209,6 +209,8 @@ class FcConfig: signing_key_source: str = "ephemeral_per_flight" dev_static_signing_key: str = "" signing_failure_threshold: int = 3 + spoof_recovery_source_set: int = 1 + source_set_switch_timeout_ms: int = 1500 def __post_init__(self) -> None: if self.adapter not in KNOWN_FC_STRATEGIES: @@ -238,6 +240,16 @@ class FcConfig: "FcConfig.signing_failure_threshold must be >= 1; got " f"{self.signing_failure_threshold}" ) + if self.spoof_recovery_source_set < 0: + raise ConfigError( + "FcConfig.spoof_recovery_source_set must be >= 0; got " + f"{self.spoof_recovery_source_set}" + ) + if self.source_set_switch_timeout_ms < 100: + raise ConfigError( + "FcConfig.source_set_switch_timeout_ms must be >= 100 ms; got " + f"{self.source_set_switch_timeout_ms}" + ) @dataclass(frozen=True) @@ -246,7 +258,7 @@ class GcsConfig: ``adapter`` selects one of :data:`KNOWN_GCS_STRATEGIES`. ``summary_rate_hz`` is the per-emitter downsample target - (Invariant 12; default 2 Hz; range [1, 2]). + (Invariant 12; default 2 Hz; range [0.5, 5.0] per AZ-397 AC-10). """ adapter: str = "qgc_mavlink" @@ -259,9 +271,9 @@ class GcsConfig: raise ConfigError( f"GcsConfig.adapter={self.adapter!r} not in {sorted(KNOWN_GCS_STRATEGIES)}" ) - if not (1.0 <= self.summary_rate_hz <= 2.0): + if not (0.5 <= self.summary_rate_hz <= 5.0): raise ConfigError( - f"GcsConfig.summary_rate_hz must be in [1.0, 2.0]; got {self.summary_rate_hz}" + f"GcsConfig.summary_rate_hz must be in [0.5, 5.0]; got {self.summary_rate_hz}" ) diff --git a/src/gps_denied_onboard/runtime_root/__init__.py b/src/gps_denied_onboard/runtime_root/__init__.py index b95dfed..403e1d6 100644 --- a/src/gps_denied_onboard/runtime_root/__init__.py +++ b/src/gps_denied_onboard/runtime_root/__init__.py @@ -36,6 +36,10 @@ from gps_denied_onboard.runtime_root.fc_factory import ( register_fc_adapter, register_gcs_adapter, ) +from gps_denied_onboard.runtime_root.spoof_recovery_sink import ( + SpoofRecoveryPublisher, + SpoofRecoverySink, +) if TYPE_CHECKING: from gps_denied_onboard.components.c13_fdr.headers import FlightHeader @@ -49,6 +53,8 @@ __all__ = [ "OperatorRoot", "OutboundThreadAlreadyBoundError", "RuntimeRoot", + "SpoofRecoveryPublisher", + "SpoofRecoverySink", "StrategyNotLinkedError", "StrategyTier", "TakeoffResult", diff --git a/src/gps_denied_onboard/runtime_root/spoof_recovery_sink.py b/src/gps_denied_onboard/runtime_root/spoof_recovery_sink.py new file mode 100644 index 0000000..b558f42 --- /dev/null +++ b/src/gps_denied_onboard/runtime_root/spoof_recovery_sink.py @@ -0,0 +1,139 @@ +"""Spoof-recovery signal sink (AZ-396 / E-C8 runtime-root wiring). + +C5's spoof-promotion gate (AZ-385) publishes a ``spoof_promotion_recovered`` +signal when a previously-spoofed FC GPS source clears the recovery +window. The runtime root forwards that signal to the C8 AP adapter's +:meth:`request_source_set_switch` on the OUTBOUND emit thread so the +single-writer invariant (Invariant 8) is preserved. + +This module owns ONLY the wiring side; the publisher side lives in +C5 (AZ-385 — not yet landed). The sink exposes a Protocol the +runtime root binds at composition-root build time: + +- :class:`SpoofRecoveryPublisher` — Protocol the C5 gate implements. +- :class:`SpoofRecoverySink` — registers itself with a publisher and + fires ``request_source_set_switch`` on the C8 adapter. + +Single-writer enforcement: the sink ALWAYS dispatches the C8 adapter +call on a single bound thread (the C8 outbound thread). It uses a +thread-safe queue so the C5 thread can publish without blocking on +the C8 wire. +""" + +from __future__ import annotations + +import queue +import threading +from typing import Protocol + +from gps_denied_onboard.components.c8_fc_adapter.errors import ( + FcAdapterError, + SourceSetSwitchError, +) +from gps_denied_onboard.components.c8_fc_adapter.interface import FcAdapter +from gps_denied_onboard.logging import get_logger + +__all__ = [ + "SpoofRecoveryPublisher", + "SpoofRecoverySink", +] + + +class SpoofRecoveryPublisher(Protocol): + """Protocol C5 (AZ-385) implements to surface the recovery signal.""" + + def subscribe_spoof_promotion_recovered(self, callback: _Callback) -> object: + """Register ``callback`` to be invoked when the gate recovers. + + Returns a handle whose ``cancel()`` method removes the subscription. + """ + + +class _Callback(Protocol): + def __call__(self) -> None: ... + + +class SpoofRecoverySink: + """Dispatches C5 spoof-recovery signals to the AP adapter outbound thread. + + Usage from the composition root:: + + sink = SpoofRecoverySink(fc_adapter) + sink.start() + publisher.subscribe_spoof_promotion_recovered(sink.publish) + # ... at shutdown ... + sink.stop() + """ + + def __init__(self, fc_adapter: FcAdapter) -> None: + self._fc = fc_adapter + self._queue: queue.Queue[None] = queue.Queue(maxsize=16) + self._stop = threading.Event() + self._thread: threading.Thread | None = None + self._log = get_logger("runtime_root.spoof_recovery_sink") + + def start(self) -> None: + """Start the dispatch thread; idempotent.""" + if self._thread is not None and self._thread.is_alive(): + return + self._stop.clear() + self._thread = threading.Thread( + target=self._run, name="c8.spoof_recovery_sink", daemon=True + ) + self._thread.start() + + def stop(self, *, join_timeout_s: float = 1.0) -> None: + """Signal the dispatch thread to exit and join.""" + self._stop.set() + # Wake the queue.get(...) blocker without leaking a real signal. + try: + self._queue.put_nowait(None) + except queue.Full: + pass + if self._thread is not None: + self._thread.join(timeout=join_timeout_s) + self._thread = None + + def publish(self) -> None: + """Invoked by the C5 publisher thread; enqueues a switch request.""" + try: + self._queue.put_nowait(None) + except queue.Full: + # Bounded queue: 16 pending switches is far more than the + # recovery gate can produce; if we ever hit this, something + # upstream is flooding. Emit a WARN and drop — the C8 + # idempotence gate (Invariant 11) would suppress them anyway. + self._log.warning( + "c8.spoof_recovery_sink_queue_full", + extra={"kind": "c8.spoof_recovery_sink_queue_full", "kv": {}}, + ) + + def _run(self) -> None: + while not self._stop.is_set(): + try: + _ = self._queue.get(timeout=0.5) + except queue.Empty: + continue + if self._stop.is_set(): + return + try: + self._fc.request_source_set_switch() + except SourceSetSwitchError as exc: + # Per spec § 5 error-handling: ERROR log + STATUSTEXT + # already happened inside the adapter; we re-log at + # DEBUG here so the sink-level audit shows the catch. + self._log.debug( + f"c8.spoof_recovery_sink_switch_failed: {exc!r}", + extra={ + "kind": "c8.spoof_recovery_sink_switch_failed", + "kv": {"error": repr(exc)}, + }, + ) + except FcAdapterError as exc: + self._log.warning( + f"c8.spoof_recovery_sink_adapter_error: {exc!r}", + extra={ + "kind": "c8.spoof_recovery_sink_adapter_error", + "kv": {"error": repr(exc)}, + }, + ) diff --git a/tests/unit/c8_fc_adapter/test_az390_adapter_protocol.py b/tests/unit/c8_fc_adapter/test_az390_adapter_protocol.py index 72a8a98..9d528f4 100644 --- a/tests/unit/c8_fc_adapter/test_az390_adapter_protocol.py +++ b/tests/unit/c8_fc_adapter/test_az390_adapter_protocol.py @@ -229,7 +229,7 @@ def test_ac2_fc_telemetry_frame_dto_frozen() -> None: def test_ac3_fc_kind_has_two_members() -> None: # Assert - assert {m.name for m in FcKind} == {"ARDUPILOT_PLANE", "INAV"} + assert {m.name for m in FcKind} == {"ARDUPILOT_PLANE", "INAV", "GCS_QGC"} def test_ac3_flight_state_has_five_members() -> None: @@ -505,9 +505,11 @@ def test_signing_key_source_unknown_value_rejected() -> None: def test_gcs_summary_rate_out_of_range_rejected() -> None: + # AZ-397 widened the valid range to [0.5, 5.0] (AC-10); the boundary + # cases below now fall OUTSIDE the new range. # Act + Assert — too high with pytest.raises(ConfigError, match=r"summary_rate_hz"): - GcsConfig(summary_rate_hz=5.0) + GcsConfig(summary_rate_hz=5.1) # Too low with pytest.raises(ConfigError, match=r"summary_rate_hz"): - GcsConfig(summary_rate_hz=0.5) + GcsConfig(summary_rate_hz=0.4) diff --git a/tests/unit/c8_fc_adapter/test_az393_ardupilot_outbound.py b/tests/unit/c8_fc_adapter/test_az393_ardupilot_outbound.py index 6efcc0c..cac7f49 100644 --- a/tests/unit/c8_fc_adapter/test_az393_ardupilot_outbound.py +++ b/tests/unit/c8_fc_adapter/test_az393_ardupilot_outbound.py @@ -297,12 +297,17 @@ def test_ac8_open_without_signing_key_succeeds(conn: _ConnStub, tmp_path) -> Non # ---------------------------------------------------------------------- -# AC-9: source-set switch raises NotImplementedError +# AC-9: source-set switch is wired (AZ-396 replaced AZ-393's placeholder) +# +# AZ-393's original AC-9 asserted `request_source_set_switch()` raises +# `NotImplementedError`; AZ-396 (batch 11) replaced that placeholder +# with the real body. The AZ-396 AC tests cover the new behaviour; +# the AZ-393 surface is now exercised by those tests rather than this +# one. The check left here verifies the method is present and callable. -def test_ac9_source_set_switch_not_implemented(adapter: PymavlinkArdupilotAdapter) -> None: - with pytest.raises(NotImplementedError, match="AZ-396"): - adapter.request_source_set_switch() +def test_ac9_source_set_switch_method_callable(adapter: PymavlinkArdupilotAdapter) -> None: + assert callable(adapter.request_source_set_switch) # ---------------------------------------------------------------------- diff --git a/tests/unit/c8_fc_adapter/test_az396_source_set_switch.py b/tests/unit/c8_fc_adapter/test_az396_source_set_switch.py new file mode 100644 index 0000000..da2861f --- /dev/null +++ b/tests/unit/c8_fc_adapter/test_az396_source_set_switch.py @@ -0,0 +1,370 @@ +"""AZ-396 — D-C8-2 source-set switch AC tests.""" + +from __future__ import annotations + +import logging +import threading +import time +from collections.abc import Iterable +from typing import Any +from unittest import mock + +import pytest + +from gps_denied_onboard._types.fc import FcKind, PortConfig, Severity +from gps_denied_onboard.components.c8_fc_adapter._covariance_projector import ( + CovarianceProjector, +) +from gps_denied_onboard.components.c8_fc_adapter.errors import SourceSetSwitchError +from gps_denied_onboard.components.c8_fc_adapter.pymavlink_ardupilot_adapter import ( + PymavlinkArdupilotAdapter, +) +from gps_denied_onboard.config import load_config +from gps_denied_onboard.runtime_root.spoof_recovery_sink import SpoofRecoverySink + +# AC-1 / AC-2 / AC-3: pymavlink ardupilotmega command id for SET_EKF_SOURCE_SET. +_CMD_SET_EKF_SOURCE_SET = 42007 +_MAV_RESULT_ACCEPTED = 0 +_MAV_RESULT_FAILED = 4 + + +class _AckMsg: + def __init__(self, command: int, result: int) -> None: + self.command = command + self.result = result + + +class _MavStub: + def __init__(self) -> None: + self.command_long_calls: list[tuple[int, ...]] = [] + self.statustext_calls: list[tuple[int, bytes]] = [] + self.named_value_float_calls: list[tuple[Any, ...]] = [] + self.gps_input_calls: list[tuple[Any, ...]] = [] + + def command_long_send( + self, + target_system: int, + target_component: int, + command: int, + confirmation: int, + p1: float, + p2: float, + p3: float, + p4: float, + p5: float, + p6: float, + p7: float, + ) -> None: + self.command_long_calls.append((target_system, target_component, command, confirmation, p1)) + + def statustext_send(self, severity: int, text: bytes) -> None: + self.statustext_calls.append((severity, text)) + + def named_value_float_send(self, time_boot_ms: int, name: bytes, value: float) -> None: + self.named_value_float_calls.append((time_boot_ms, name, value)) + + def gps_input_send(self, *args: Any) -> None: + self.gps_input_calls.append(args) + + +class _ConnStub: + def __init__(self, ack_queue: Iterable[_AckMsg] | None = None) -> None: + self.mav = _MavStub() + self.target_system = 1 + self.target_component = 1 + self._ack_queue = list(ack_queue or []) + self.closed = False + + def recv_match(self, *, type: str, blocking: bool, timeout: float | None) -> Any: + # Real pymavlink filters by ``type``; the inbound decoder thread + # calls recv_match without a type filter (None), the outbound + # source-set switch calls with type="COMMAND_ACK". Honour that + # so the inbound loop does not eat our ACK. + if type != "COMMAND_ACK": + return None + if not self._ack_queue: + return None + return self._ack_queue.pop(0) + + def setup_signing(self, key: bytes) -> None: + pass + + def close(self) -> None: + self.closed = True + + +def _ap_config( + *, + source_set: int = 1, + timeout_ms: int = 1500, +) -> Any: + env = { + "FC_ADAPTER": "ardupilot_plane", + "FC_PORT_DEVICE": "/dev/null", + "FC_PORT_BAUD": "921600", + "FC_SIGNING_KEY_SOURCE": "none", + "FC_SPOOF_RECOVERY_SOURCE_SET": str(source_set), + "FC_SOURCE_SET_SWITCH_TIMEOUT_MS": str(timeout_ms), + } + return load_config(env=env, paths=(), require_env=False) + + +def _make_adapter(conn: _ConnStub, cfg: Any) -> PymavlinkArdupilotAdapter: + fdr = mock.MagicMock() + a = PymavlinkArdupilotAdapter( + config=cfg, + wgs_converter=mock.MagicMock(), + covariance_projector=CovarianceProjector(fdr_client=fdr), + fdr_client=fdr, + flight_id="flt-az396", + connect_factory=lambda device, baud: conn, + ) + port = PortConfig(fc_kind=FcKind.ARDUPILOT_PLANE, device="/dev/null", baud=921600) + a.open(port, signing_key=None) + return a + + +# ---------------------------------------------------------------------- +# AC-1: ACK success path + + +def test_ac1_ack_success(caplog: pytest.LogCaptureFixture) -> None: + conn = _ConnStub(ack_queue=[_AckMsg(_CMD_SET_EKF_SOURCE_SET, _MAV_RESULT_ACCEPTED)]) + a = _make_adapter(conn, _ap_config(source_set=1)) + try: + with caplog.at_level(logging.INFO): + a.request_source_set_switch() + assert len(conn.mav.command_long_calls) == 1 + _, _, cmd, _, p1 = conn.mav.command_long_calls[0] + assert cmd == _CMD_SET_EKF_SOURCE_SET + assert p1 == 1.0 + assert any( + getattr(r, "kind", None) == "c8.ap.source_set_switch_executed" for r in caplog.records + ) + assert any(sev == int(Severity.INFO.value) for sev, _ in conn.mav.statustext_calls) + assert any( + call.args[0].payload.get("kind") == "c8.ap.source_set_switch_executed" + for call in a._fdr_client.enqueue.mock_calls # type: ignore[attr-defined] + ) + finally: + a.close() + + +# ---------------------------------------------------------------------- +# AC-2: non-success ACK raises + + +def test_ac2_non_success_ack_raises(caplog: pytest.LogCaptureFixture) -> None: + conn = _ConnStub(ack_queue=[_AckMsg(_CMD_SET_EKF_SOURCE_SET, _MAV_RESULT_FAILED)]) + a = _make_adapter(conn, _ap_config()) + try: + with caplog.at_level(logging.ERROR), pytest.raises(SourceSetSwitchError, match="result=4"): + a.request_source_set_switch() + assert any( + getattr(r, "kind", None) == "c8.ap.source_set_switch_failed" for r in caplog.records + ) + assert any(sev == int(Severity.ERROR.value) for sev, _ in conn.mav.statustext_calls) + finally: + a.close() + + +# ---------------------------------------------------------------------- +# AC-3: ACK timeout raises + + +def test_ac3_ack_timeout_raises(caplog: pytest.LogCaptureFixture) -> None: + conn = _ConnStub(ack_queue=[]) # never ACK + a = _make_adapter(conn, _ap_config(timeout_ms=200)) + try: + with ( + caplog.at_level(logging.ERROR), + pytest.raises(SourceSetSwitchError, match="timeout after 200ms"), + ): + a.request_source_set_switch() + assert any( + getattr(r, "kind", None) == "c8.ap.source_set_switch_failed" for r in caplog.records + ) + finally: + a.close() + + +# ---------------------------------------------------------------------- +# AC-4: configurable timeout (AC-3 already exercises a non-default value) + + +def test_ac4_configurable_timeout_uses_config_value() -> None: + conn = _ConnStub(ack_queue=[]) + cfg = _ap_config(timeout_ms=500) + a = _make_adapter(conn, cfg) + try: + with pytest.raises(SourceSetSwitchError, match="timeout after 500ms"): + a.request_source_set_switch() + finally: + a.close() + + +# ---------------------------------------------------------------------- +# AC-5: idempotence within 1 s — second call no-op'd + + +def test_ac5_idempotence_within_1s_rate_limited(caplog: pytest.LogCaptureFixture) -> None: + conn = _ConnStub(ack_queue=[_AckMsg(_CMD_SET_EKF_SOURCE_SET, _MAV_RESULT_FAILED)]) + a = _make_adapter(conn, _ap_config()) + try: + with pytest.raises(SourceSetSwitchError): + a.request_source_set_switch() + with caplog.at_level(logging.INFO): + a.request_source_set_switch() + # only one command_long_send hit the wire + assert len(conn.mav.command_long_calls) == 1 + assert any( + getattr(r, "kind", None) == "c8.ap.source_set_switch_rate_limited" + for r in caplog.records + ) + finally: + a.close() + + +# ---------------------------------------------------------------------- +# AC-6: idempotence after successful switch — second call after >1 s logs +# but does NOT re-emit + + +def test_ac6_idempotence_after_success_no_reissue(caplog: pytest.LogCaptureFixture) -> None: + conn = _ConnStub(ack_queue=[_AckMsg(_CMD_SET_EKF_SOURCE_SET, _MAV_RESULT_ACCEPTED)]) + a = _make_adapter(conn, _ap_config()) + try: + a.request_source_set_switch() + # advance the monotonic clock past the 1 s rate-limit by mutating + # the stored timestamp (we control the adapter under test). + a._last_switch_attempt_ns -= 2_000_000_000 # type: ignore[attr-defined] + with caplog.at_level(logging.INFO): + a.request_source_set_switch() + assert len(conn.mav.command_long_calls) == 1 # never re-issued + assert any( + getattr(r, "kind", None) == "c8.ap.source_set_switch_already_active" + for r in caplog.records + ) + finally: + a.close() + + +# ---------------------------------------------------------------------- +# AC-7: SpoofRecoverySink invokes request_source_set_switch on adapter + + +def test_ac7_spoof_recovery_sink_triggers_switch() -> None: + mock_adapter = mock.MagicMock() + sink = SpoofRecoverySink(mock_adapter) + sink.start() + try: + sink.publish() + # Wait briefly for the dispatch thread to process. + for _ in range(20): + if mock_adapter.request_source_set_switch.call_count >= 1: + break + time.sleep(0.05) + assert mock_adapter.request_source_set_switch.call_count == 1 + finally: + sink.stop() + + +def test_ac7_spoof_recovery_sink_isolates_errors() -> None: + mock_adapter = mock.MagicMock() + mock_adapter.request_source_set_switch.side_effect = SourceSetSwitchError("ACK timeout") + sink = SpoofRecoverySink(mock_adapter) + sink.start() + try: + sink.publish() + # Sink must not crash; we can re-publish. + for _ in range(20): + if mock_adapter.request_source_set_switch.call_count >= 1: + break + time.sleep(0.05) + assert mock_adapter.request_source_set_switch.call_count == 1 + sink.publish() + for _ in range(20): + if mock_adapter.request_source_set_switch.call_count >= 2: + break + time.sleep(0.05) + assert mock_adapter.request_source_set_switch.call_count == 2 + finally: + sink.stop() + + +# ---------------------------------------------------------------------- +# AC-8: source_set_id from config + + +def test_ac8_source_set_id_from_config() -> None: + conn = _ConnStub(ack_queue=[_AckMsg(_CMD_SET_EKF_SOURCE_SET, _MAV_RESULT_ACCEPTED)]) + a = _make_adapter(conn, _ap_config(source_set=2)) + try: + a.request_source_set_switch() + _, _, _, _, p1 = conn.mav.command_long_calls[0] + assert p1 == 2.0 + finally: + a.close() + + +# ---------------------------------------------------------------------- +# AC-9: AZ-393 NotImplementedError placeholder replaced + + +def test_ac9_no_longer_raises_not_implemented() -> None: + conn = _ConnStub(ack_queue=[_AckMsg(_CMD_SET_EKF_SOURCE_SET, _MAV_RESULT_ACCEPTED)]) + a = _make_adapter(conn, _ap_config()) + try: + a.request_source_set_switch() # would have raised NotImplementedError pre-AZ-396 + finally: + a.close() + + +# ---------------------------------------------------------------------- +# AC-10: STATUSTEXT severity matrix + + +def test_ac10_statustext_severity_matrix(caplog: pytest.LogCaptureFixture) -> None: + # success → INFO + conn1 = _ConnStub(ack_queue=[_AckMsg(_CMD_SET_EKF_SOURCE_SET, _MAV_RESULT_ACCEPTED)]) + a1 = _make_adapter(conn1, _ap_config()) + try: + a1.request_source_set_switch() + success_severities = [sev for sev, _ in conn1.mav.statustext_calls] + assert int(Severity.INFO.value) in success_severities + finally: + a1.close() + # failure → ERROR + conn2 = _ConnStub(ack_queue=[_AckMsg(_CMD_SET_EKF_SOURCE_SET, _MAV_RESULT_FAILED)]) + a2 = _make_adapter(conn2, _ap_config()) + try: + with pytest.raises(SourceSetSwitchError): + a2.request_source_set_switch() + failure_severities = [sev for sev, _ in conn2.mav.statustext_calls] + assert int(Severity.ERROR.value) in failure_severities + finally: + a2.close() + + +# ---------------------------------------------------------------------- +# Defence-in-depth: single-writer enforcement still applies + + +def test_single_writer_thread_enforced_on_switch() -> None: + conn = _ConnStub(ack_queue=[_AckMsg(_CMD_SET_EKF_SOURCE_SET, _MAV_RESULT_ACCEPTED)]) + a = _make_adapter(conn, _ap_config()) + a.request_source_set_switch() # bind main thread + err: list[BaseException] = [] + + def run() -> None: + try: + a.request_source_set_switch() + except RuntimeError as e: + err.append(e) + + t = threading.Thread(target=run) + t.start() + t.join(timeout=2.0) + assert len(err) == 1 + assert "single-writer" in str(err[0]).lower() + a.close() diff --git a/tests/unit/c8_fc_adapter/test_az397_qgc_telemetry.py b/tests/unit/c8_fc_adapter/test_az397_qgc_telemetry.py new file mode 100644 index 0000000..ca1c6de --- /dev/null +++ b/tests/unit/c8_fc_adapter/test_az397_qgc_telemetry.py @@ -0,0 +1,360 @@ +"""AZ-397 — QgcTelemetryAdapter AC tests.""" + +from __future__ import annotations + +import logging +import threading +from datetime import datetime, timezone +from types import SimpleNamespace +from typing import Any +from unittest import mock + +import numpy as np +import pytest + +from gps_denied_onboard._types.fc import ( + FcKind, + OperatorCommand, + PortConfig, + Severity, +) +from gps_denied_onboard._types.geo import LatLonAlt +from gps_denied_onboard._types.pose import EstimatorOutput +from gps_denied_onboard.components.c8_fc_adapter._covariance_projector import ( + CovarianceProjector, +) +from gps_denied_onboard.components.c8_fc_adapter.errors import GcsAdapterConfigError +from gps_denied_onboard.components.c8_fc_adapter.mavlink_gcs_adapter import ( + QgcTelemetryAdapter, + _compute_downsample_modulo, +) +from gps_denied_onboard.config import load_config +from gps_denied_onboard.config.schema import ConfigError, GcsConfig +from gps_denied_onboard.helpers.wgs_converter import WgsConverter + +# ---------------------------------------------------------------------- +# Helpers + + +class _MavStub: + def __init__(self) -> None: + self.global_position_int_calls: list[tuple[Any, ...]] = [] + self.named_value_float_calls: list[tuple[Any, ...]] = [] + self.statustext_calls: list[tuple[int, bytes]] = [] + + def global_position_int_send(self, *args: Any) -> None: + self.global_position_int_calls.append(args) + + def named_value_float_send(self, time_boot_ms: int, name: bytes, value: float) -> None: + self.named_value_float_calls.append((time_boot_ms, name, value)) + + def statustext_send(self, severity: int, text: bytes) -> None: + self.statustext_calls.append((severity, text)) + + +class _ConnStub: + def __init__(self) -> None: + self.mav = _MavStub() + self.message_hooks: list[Any] = [] + self.closed = False + + def close(self) -> None: + self.closed = True + + +def _config(*, summary_rate_hz: float = 2.0) -> Any: + env = { + "FC_ADAPTER": "ardupilot_plane", + "FC_PORT_DEVICE": "/dev/null", + "FC_PORT_BAUD": "921600", + "FC_SIGNING_KEY_SOURCE": "none", + "GCS_ADAPTER": "qgc_mavlink", + "GCS_PORT_DEVICE": "/dev/null", + "GCS_PORT_BAUD": "921600", + "GCS_SUMMARY_RATE_HZ": str(summary_rate_hz), + } + return load_config(env=env, paths=(), require_env=False) + + +def _make_output(*, source_label: str = "visual_propagated", frame_id: int = 1) -> EstimatorOutput: + return EstimatorOutput( + frame_id=frame_id, + timestamp=datetime.now(tz=timezone.utc), + pose_se3=np.eye(4), + covariance_6x6=np.eye(6, dtype=np.float64) * 0.25, + source_label=source_label, + smoothed=False, + extras={"wgs84": LatLonAlt(lat_deg=50.0, lon_deg=30.0, alt_m=100.0)}, + ) + + +def _make_adapter(conn: _ConnStub, cfg: Any) -> QgcTelemetryAdapter: + fdr = mock.MagicMock() + a = QgcTelemetryAdapter( + config=cfg, + wgs_converter=mock.MagicMock(), + covariance_projector=CovarianceProjector(fdr_client=fdr), + fdr_client=fdr, + connect_factory=lambda device, baud: conn, + ) + port = PortConfig(fc_kind=FcKind.GCS_QGC, device="/dev/null", baud=921600) + a.open(port) + return a + + +# ---------------------------------------------------------------------- +# AC-1: 5 -> 2 Hz downsampling + + +def test_ac1_5hz_to_2hz_downsample() -> None: + conn = _ConnStub() + a = _make_adapter(conn, _config(summary_rate_hz=2.0)) + try: + for i in range(100): + a.emit_summary(_make_output(frame_id=i)) + # modulo = round(5/2) = 2 (round half to even gives 2; we just + # need parity with the documented choice). 100 / 2 = 50. + assert _compute_downsample_modulo(2.0) == 2 + assert len(conn.mav.global_position_int_calls) == 50 + finally: + a.close() + + +# ---------------------------------------------------------------------- +# AC-2: configurable rate + + +@pytest.mark.parametrize( + "rate_hz,expected_frames", + [ + (1.0, 20), # modulo 5 -> 100/5 + (5.0, 100), # modulo 1 -> no downsample + ], +) +def test_ac2_configurable_rate(rate_hz: float, expected_frames: int) -> None: + conn = _ConnStub() + a = _make_adapter(conn, _config(summary_rate_hz=rate_hz)) + try: + for i in range(100): + a.emit_summary(_make_output(frame_id=i)) + assert len(conn.mav.global_position_int_calls) == expected_frames + finally: + a.close() + + +def test_ac2_out_of_range_rate_rejected_at_config() -> None: + with pytest.raises(ConfigError, match="summary_rate_hz"): + GcsConfig(summary_rate_hz=10.0) + with pytest.raises(ConfigError, match="summary_rate_hz"): + GcsConfig(summary_rate_hz=0.2) + + +# ---------------------------------------------------------------------- +# AC-3: summary frame fields + + +def test_ac3_summary_frame_fields() -> None: + conn = _ConnStub() + fdr = mock.MagicMock() + a = QgcTelemetryAdapter( + config=_config(summary_rate_hz=5.0), # no downsample for simplicity + wgs_converter=mock.MagicMock(), + covariance_projector=CovarianceProjector(fdr_client=fdr), + fdr_client=fdr, + connect_factory=lambda device, baud: conn, + ) + port = PortConfig(fc_kind=FcKind.GCS_QGC, device="/dev/null", baud=921600) + a.open(port) + try: + wgs = LatLonAlt(lat_deg=50.45, lon_deg=30.52, alt_m=180.0) + output = EstimatorOutput( + frame_id=1, + timestamp=datetime.now(tz=timezone.utc), + pose_se3=np.eye(4), + covariance_6x6=np.diag([0.25, 0.25, 9.0, 1.0, 1.0, 1.0]).astype(np.float64), + source_label="visual_propagated", + smoothed=False, + extras={"wgs84": wgs}, + ) + a.emit_summary(output) + assert len(conn.mav.global_position_int_calls) == 1 + call = conn.mav.global_position_int_calls[0] + # global_position_int_send(time_boot_ms, lat, lon, alt_mm, rel_alt_mm, vx, vy, vz, hdg) + assert call[1] == int(wgs.lat_deg * 1e7) + assert call[2] == int(wgs.lon_deg * 1e7) + assert call[3] == int(wgs.alt_m * 1000.0) + # horiz_accuracy via NAMED_VALUE_FLOAT + assert len(conn.mav.named_value_float_calls) == 1 + _, name, value = conn.mav.named_value_float_calls[0] + assert name == b"horiz_m" + expected = CovarianceProjector(fdr_client=mock.MagicMock()).to_ardupilot_horiz_accuracy_m( + output + ) + assert value == pytest.approx(expected) + finally: + a.close() + + +# ---------------------------------------------------------------------- +# AC-4: STATUSTEXT mirror + + +def test_ac4_statustext_mirror() -> None: + conn = _ConnStub() + a = _make_adapter(conn, _config(summary_rate_hz=2.0)) + try: + a.emit_status_text("hello", Severity.INFO) + assert len(conn.mav.statustext_calls) == 1 + sev, text = conn.mav.statustext_calls[0] + assert sev == int(Severity.INFO.value) + assert text == b"hello" + finally: + a.close() + + +# ---------------------------------------------------------------------- +# AC-5: operator command subscription + + +def test_ac5_operator_command_subscription_invokes_callback() -> None: + conn = _ConnStub() + a = _make_adapter(conn, _config()) + received: list[OperatorCommand] = [] + try: + a.subscribe_operator_commands(received.append) + # Inject a PARAM_REQUEST_LIST stub + msg = SimpleNamespace(target_system=1, target_component=1) + msg.get_type = lambda: "PARAM_REQUEST_LIST" + msg.get_srcSystem = lambda: 42 + # Fire every registered hook + for hook in conn.message_hooks: + hook(conn, msg) + assert len(received) == 1 + cmd = received[0] + assert cmd.command == "PARAM_REQUEST_LIST" + assert cmd.payload.get("target_system") == 1 + finally: + a.close() + + +# ---------------------------------------------------------------------- +# AC-6: operator command FDR audit trail + + +def test_ac6_operator_command_fdr_audit_trail() -> None: + conn = _ConnStub() + fdr = mock.MagicMock() + a = QgcTelemetryAdapter( + config=_config(), + wgs_converter=mock.MagicMock(), + covariance_projector=CovarianceProjector(fdr_client=fdr), + fdr_client=fdr, + connect_factory=lambda device, baud: conn, + ) + port = PortConfig(fc_kind=FcKind.GCS_QGC, device="/dev/null", baud=921600) + a.open(port) + try: + a.subscribe_operator_commands(lambda _cmd: None) + msg = SimpleNamespace(target_system=1, target_component=1, command=400) + msg.get_type = lambda: "COMMAND_LONG" + msg.get_srcSystem = lambda: 7 + for hook in conn.message_hooks: + hook(conn, msg) + operator_records = [ + call + for call in fdr.enqueue.mock_calls + if call.args[0].payload.get("kind") == "c8.gcs.operator_command" + ] + assert len(operator_records) == 1 + rec = operator_records[0].args[0].payload + assert rec["kv"]["command"] == "COMMAND_LONG" + assert rec["kv"]["source_system"] == 7 + finally: + a.close() + + +# ---------------------------------------------------------------------- +# AC-7: single-writer thread for outbound + + +def test_ac7_single_writer_thread() -> None: + conn = _ConnStub() + a = _make_adapter(conn, _config(summary_rate_hz=5.0)) + err: list[BaseException] = [] + try: + a.emit_summary(_make_output()) + + def run() -> None: + try: + a.emit_summary(_make_output(frame_id=2)) + except RuntimeError as e: + err.append(e) + + t = threading.Thread(target=run) + t.start() + t.join(timeout=2.0) + assert len(err) == 1 + assert "single-writer" in str(err[0]).lower() + finally: + a.close() + + +# ---------------------------------------------------------------------- +# AC-8: first emit logged once + + +def test_ac8_first_emit_logged_once(caplog: pytest.LogCaptureFixture) -> None: + conn = _ConnStub() + a = _make_adapter(conn, _config(summary_rate_hz=5.0)) + try: + with caplog.at_level(logging.INFO): + for i in range(5): + a.emit_summary(_make_output(frame_id=i)) + first = [ + r for r in caplog.records if getattr(r, "kind", None) == "c8.gcs.first_summary_emit" + ] + assert len(first) == 1 + finally: + a.close() + + +# ---------------------------------------------------------------------- +# AC-9: WGS84 round-trip (defensive) + + +def test_ac9_wgs84_round_trip_within_1cm() -> None: + # Round-trip a known lat/lon through WgsConverter ECEF and back. + origin = LatLonAlt(lat_deg=50.45, lon_deg=30.52, alt_m=180.0) + enu = WgsConverter.latlonalt_to_local_enu( + origin, LatLonAlt(lat_deg=50.4501, lon_deg=30.5201, alt_m=180.5) + ) + recovered = WgsConverter.local_enu_to_latlonalt(origin, enu) + expected = LatLonAlt(lat_deg=50.4501, lon_deg=30.5201, alt_m=180.5) + # 1 cm threshold — pyproj's ENU round-trip is sub-millimetre at + # these distances; we keep the AC's 1 cm threshold as the contract. + enu_residual = WgsConverter.latlonalt_to_local_enu(expected, recovered) + assert np.linalg.norm(enu_residual) < 0.01 + + +# ---------------------------------------------------------------------- +# AC-10: GcsAdapterConfigError on bad config (config-load path) + + +def test_ac10_gcs_config_error_on_bad_rate() -> None: + with pytest.raises(ConfigError, match="summary_rate_hz"): + GcsConfig(summary_rate_hz=6.0) + + +def test_ac10_open_rejects_wrong_fc_kind() -> None: + conn = _ConnStub() + fdr = mock.MagicMock() + a = QgcTelemetryAdapter( + config=_config(), + wgs_converter=mock.MagicMock(), + covariance_projector=CovarianceProjector(fdr_client=fdr), + fdr_client=fdr, + connect_factory=lambda device, baud: conn, + ) + port = PortConfig(fc_kind=FcKind.ARDUPILOT_PLANE, device="/dev/null", baud=921600) + with pytest.raises(GcsAdapterConfigError, match="GCS_QGC"): + a.open(port)