"""ArduPilot Plane / iNav SITL state-read observers (AZ-595 FDR-replay strategy). All 11 public surfaces are backed by JSON files under ``${E2E_SITL_REPLAY_DIR}/`` — there is no live pymavlink / yamspy / TCP connection in this implementation. This intentionally decouples scenario execution from live SITL infrastructure: tests can run deterministically against runner-produced fixture files, and a future "live" strategy can plug in behind the same surface without changing any scenario code. When ``E2E_SITL_REPLAY_DIR`` is unset OR the corresponding fixture file is missing: * `read_*` surfaces return an **empty list** (vacuous). Scenarios use the module-level ``replay_dir_available()`` probe to detect this and skip. * `prepare_sitl_*` surfaces are no-ops (FDR-replay does not need to actually configure SITL state — the fixture file IS the prepared state). * `capture_ap_tlog` / `read_ap_parameter` / `query_inav_gps_state` / `observe_inav_tcp_handshake` / `collect_inav_msp_frames` raise ``RuntimeError`` because they require non-empty fixture data to produce a meaningful result. Fixture file naming (under `${E2E_SITL_REPLAY_DIR}/`): * `ekf_divergence_events.json` — list[{monotonic_ms, severity, message}] * `gps_health_samples.json` — list[{monotonic_ms, healthy, spoofed}] * `consistency_check_events.json` — list[{monotonic_ms, passed}] * `observer__.json` — {gps_state: {...}, parameters: {...}} * `outbound_messages__.json` — {messages: [{image_id?, lat_deg, lon_deg} | null, ...]} * `ap_parameters_.json` — {: , ...} * `ap_tlog_.tlog` — raw mavproxy tlog (any binary content) * `gcs_tlog_.tlog` — raw mavproxy-listener tlog from the GCS link (SUT→GCS summary stream + GCS→SUT operator commands; FT-P-12, FT-P-13) * `inav_handshake_.json` — {established_within_s: float | None} * `inav_msp_frames_.json` — {frames: [...], expected_num_sat: int} * `inav_gps_state_.json` — {fix_type, num_sat, provider} Public-boundary discipline: this module does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations import json import os from dataclasses import dataclass, field from pathlib import Path from typing import Iterable, Literal _ENV_VAR = "E2E_SITL_REPLAY_DIR" FcKind = Literal["ardupilot", "inav"] # Dataclasses @dataclass(frozen=True) class FcGpsState: """The subset of FC state the e2e tests assert against.""" primary_source: str last_position_lat_deg: float last_position_lon_deg: float last_position_alt_m: float fix_quality: int horizontal_accuracy_m: float last_update_age_ms: int @dataclass(frozen=True) class EkfDivergenceEvent: monotonic_ms: int severity: str message: str @dataclass(frozen=True) class GpsHealthSample: monotonic_ms: int healthy: bool spoofed: bool @dataclass(frozen=True) class ConsistencyCheckEvent: monotonic_ms: int passed: bool @dataclass(frozen=True) class TcpHandshakeReport: """Result of an iNav SITL TCP handshake observation.""" established_within_s: float | None @dataclass(frozen=True) class MspFrameSample: monotonic_ms: int function_id: int @dataclass(frozen=True) class MspFrameCapture: """One window of MSP frame samples from the iNav SITL.""" frames: list[MspFrameSample] expected_num_sat: int @dataclass(frozen=True) class InavGpsState: fix_type: int num_sat: int provider: str @dataclass(frozen=True) class OutboundMessage: """One outbound GPS estimate captured from the SUT. Both ArduPilot ``GPS_INPUT`` and iNav ``MSP2_SENSOR_GPS`` are projected into this minimal shape because the scenarios consuming `wait_for_outbound` only care about the geo-coordinates. The optional `image_id` round-trips for diagnostics but is not part of the consumer contract. """ lat_deg: float lon_deg: float image_id: str | None = None # Observer interface (returned by ``get_observer``) @dataclass class _FdrReplayObserver: """FDR-replay observer — reads SUT state from JSON fixtures. `_payload` holds the observer configuration fixture (`observer__.json`). Cursor state for `wait_for_outbound` is intentionally lazy — the outbound-messages fixture is loaded on the first call so observers constructed for scenarios that never call `wait_for_outbound` don't pay the I/O. """ fc_kind: FcKind host: str _payload: dict _outbound_cursor: int = 0 _outbound_messages: list[dict | None] | None = field(default=None, repr=False) def read_gps_state(self) -> FcGpsState: gps = self._payload.get("gps_state") if not isinstance(gps, dict): raise RuntimeError( f"sitl_observer ({self.fc_kind}/{self.host}): fixture missing `gps_state` object" ) return FcGpsState( primary_source=str(gps["primary_source"]), last_position_lat_deg=float(gps["last_position_lat_deg"]), last_position_lon_deg=float(gps["last_position_lon_deg"]), last_position_alt_m=float(gps["last_position_alt_m"]), fix_quality=int(gps["fix_quality"]), horizontal_accuracy_m=float(gps["horizontal_accuracy_m"]), last_update_age_ms=int(gps["last_update_age_ms"]), ) def read_parameter(self, name: str) -> float | int | str | None: params = self._payload.get("parameters", {}) if not isinstance(params, dict): raise RuntimeError( f"sitl_observer ({self.fc_kind}/{self.host}): fixture `parameters` must be an object" ) return params.get(name) def wait_for_outbound(self, timeout_s: float | None = None) -> OutboundMessage: """Return the next captured outbound GPS estimate (cursor-based replay). `timeout_s` is accepted for live-mode parity and ignored in replay mode — the fixture already encodes per-call timeouts as `null` entries. Raises: TimeoutError: cursor entry is `null` (SUT didn't emit anything for the corresponding image during capture). RuntimeError: fixture missing OR malformed OR cursor advanced past the messages list length. """ if self._outbound_messages is None: self._outbound_messages = _load_outbound_messages(self.fc_kind, self.host) if self._outbound_cursor >= len(self._outbound_messages): raise RuntimeError( f"sitl_observer ({self.fc_kind}/{self.host}): " f"outbound messages fixture exhausted after " f"{self._outbound_cursor} call(s); scenario expects more" ) entry = self._outbound_messages[self._outbound_cursor] self._outbound_cursor += 1 if entry is None: raise TimeoutError( f"sitl_observer ({self.fc_kind}/{self.host}): " f"outbound message #{self._outbound_cursor} captured as " f"timeout in fixture (timeout_s={timeout_s})" ) return OutboundMessage( lat_deg=float(entry["lat_deg"]), lon_deg=float(entry["lon_deg"]), image_id=entry.get("image_id"), ) def _load_outbound_messages(fc_kind: FcKind, host: str) -> list[dict | None]: """Load + validate `outbound_messages__.json`. Returns the validated `messages` list (None entries preserved). Raises RuntimeError on any malformed shape so observers fail loudly rather than hand out garbage. """ payload, path = _load_required_json(f"outbound_messages_{fc_kind}_{host}.json") raw = payload.get("messages") if not isinstance(raw, list): raise RuntimeError( f"sitl_observer outbound fixture {path}: " f"`messages` must be a JSON list; got {type(raw).__name__}" ) validated: list[dict | None] = [] for idx, entry in enumerate(raw): if entry is None: validated.append(None) continue if not isinstance(entry, dict): raise RuntimeError( f"sitl_observer outbound fixture {path}: " f"messages[{idx}] must be a JSON object or null; got {type(entry).__name__}" ) if "lat_deg" not in entry or "lon_deg" not in entry: raise RuntimeError( f"sitl_observer outbound fixture {path}: " f"messages[{idx}] missing required `lat_deg`/`lon_deg` keys" ) validated.append(entry) return validated # Module-level helpers def replay_dir() -> Path | None: """Resolve the FDR-replay fixture root from the env var, or None if unset.""" raw = os.environ.get(_ENV_VAR, "").strip() if not raw: return None return Path(raw) def replay_dir_available() -> bool: """True iff ``E2E_SITL_REPLAY_DIR`` is set AND points to an existing directory.""" root = replay_dir() return root is not None and root.is_dir() def _load_optional_json_list(filename: str, parser) -> list: """Load `${E2E_SITL_REPLAY_DIR}/`; return [] when absent.""" root = replay_dir() if root is None: return [] path = root / filename if not path.exists(): return [] decoded = json.loads(path.read_text()) if not isinstance(decoded, list): raise RuntimeError( f"sitl_observer fixture {path} must be a JSON list; got {type(decoded).__name__}" ) return [parser(item, path) for item in decoded] def _load_required_json(filename: str) -> tuple[dict, Path]: """Load `${E2E_SITL_REPLAY_DIR}/`; raise RuntimeError when absent.""" root = replay_dir() if root is None: raise RuntimeError( f"sitl_observer: {_ENV_VAR} env var not set; cannot read fixture {filename}" ) path = root / filename if not path.exists(): raise RuntimeError( f"sitl_observer: required fixture not found: {path}" ) decoded = json.loads(path.read_text()) if not isinstance(decoded, dict): raise RuntimeError( f"sitl_observer fixture {path} must be a JSON object; got {type(decoded).__name__}" ) return decoded, path # get_observer factory def get_observer(fc_kind: FcKind, host: str) -> _FdrReplayObserver: """Return an FDR-replay observer bound to a fixture file. Fixture path: ``${E2E_SITL_REPLAY_DIR}/observer__.json``. Raises ``RuntimeError`` if the env var is unset or the fixture is missing. """ payload, _ = _load_required_json(f"observer_{fc_kind}_{host}.json") return _FdrReplayObserver(fc_kind=fc_kind, host=host, _payload=payload) # read_* surfaces (return [] when fixtures absent) def _parse_ekf_event(item: dict, source: Path) -> EkfDivergenceEvent: try: return EkfDivergenceEvent( monotonic_ms=int(item["monotonic_ms"]), severity=str(item["severity"]), message=str(item["message"]), ) except (KeyError, TypeError, ValueError) as exc: raise RuntimeError( f"sitl_observer EKF divergence fixture malformed at {source}: {exc}" ) from exc def read_ekf_divergence_events() -> list[EkfDivergenceEvent]: """Return EKF divergence events. Empty list when fixture absent.""" return _load_optional_json_list("ekf_divergence_events.json", _parse_ekf_event) def _parse_gps_health(item: dict, source: Path) -> GpsHealthSample: try: return GpsHealthSample( monotonic_ms=int(item["monotonic_ms"]), healthy=bool(item["healthy"]), spoofed=bool(item["spoofed"]), ) except (KeyError, TypeError, ValueError) as exc: raise RuntimeError( f"sitl_observer GPS health fixture malformed at {source}: {exc}" ) from exc def read_gps_health_samples() -> list[GpsHealthSample]: """Return FC-side GPS health samples. Empty list when fixture absent.""" return _load_optional_json_list("gps_health_samples.json", _parse_gps_health) def _parse_consistency_event(item: dict, source: Path) -> ConsistencyCheckEvent: try: return ConsistencyCheckEvent( monotonic_ms=int(item["monotonic_ms"]), passed=bool(item["passed"]), ) except (KeyError, TypeError, ValueError) as exc: raise RuntimeError( f"sitl_observer consistency-check fixture malformed at {source}: {exc}" ) from exc def read_consistency_check_events() -> list[ConsistencyCheckEvent]: """Return visual/satellite consistency-check events. Empty list when fixture absent.""" return _load_optional_json_list( "consistency_check_events.json", _parse_consistency_event ) # prepare_sitl_* — no-ops under FDR-replay def prepare_sitl_cold_boot(host: str, fixture_path: Path) -> None: """No-op under FDR-replay: the cold-boot state IS the fixture file. Raises ``RuntimeError`` if either ``host`` or ``fixture_path`` is empty — these are required for the future live-SITL implementation and surfacing the missing input early avoids confusing downstream errors. """ if not host: raise RuntimeError("prepare_sitl_cold_boot: host must be non-empty") if fixture_path is None: raise RuntimeError("prepare_sitl_cold_boot: fixture_path is required") def prepare_sitl_no_gps(host: str) -> None: """No-op under FDR-replay (the "no GPS" condition is encoded in the fixture).""" if not host: raise RuntimeError("prepare_sitl_no_gps: host must be non-empty") # capture_ap_tlog — returns synthetic tlog path def capture_ap_tlog(host: str, duration_s: float) -> Path: """Return the path to the AP mavproxy tlog fixture for ``host``. Fixture: ``${E2E_SITL_REPLAY_DIR}/ap_tlog_.tlog``. Raises ``RuntimeError`` if env var unset or fixture missing. ``duration_s`` is recorded for future live-mode use but ignored here. """ if duration_s <= 0: raise RuntimeError(f"capture_ap_tlog: duration_s must be positive; got {duration_s}") root = replay_dir() if root is None: raise RuntimeError( f"capture_ap_tlog: {_ENV_VAR} env var not set" ) path = root / f"ap_tlog_{host}.tlog" if not path.exists(): raise RuntimeError( f"capture_ap_tlog: fixture not found at {path}" ) return path def capture_gcs_tlog(host: str, duration_s: float) -> Path: """Return the path to the GCS-side mavproxy-listener tlog for ``host``. Fixture: ``${E2E_SITL_REPLAY_DIR}/gcs_tlog_.tlog``. The tlog captures both directions over the QGC GCS link — SUT→GCS summary bursts (``GLOBAL_POSITION_INT`` + ``NAMED_VALUE_FLOAT``) and GCS→SUT operator commands (``STATUSTEXT`` reloc-hints, ``COMMAND_LONG`` parameter reads, etc.). ``duration_s`` is recorded for future live-mode use but ignored here — under FDR-replay the fixture file IS the captured stream. Raises ``RuntimeError`` if env var unset or fixture missing. """ if duration_s <= 0: raise RuntimeError(f"capture_gcs_tlog: duration_s must be positive; got {duration_s}") root = replay_dir() if root is None: raise RuntimeError( f"capture_gcs_tlog: {_ENV_VAR} env var not set" ) path = root / f"gcs_tlog_{host}.tlog" if not path.exists(): raise RuntimeError( f"capture_gcs_tlog: fixture not found at {path}" ) return path # read_ap_parameter — reads from param-dump JSON def read_ap_parameter(host: str, name: str) -> float | int | str | None: """Read AP parameter ``name`` from the per-host param dump. Fixture: ``${E2E_SITL_REPLAY_DIR}/ap_parameters_.json`` ({name: value}). Raises ``RuntimeError`` if env var unset or fixture missing. Returns ``None`` if the parameter is not in the dump. """ payload, _ = _load_required_json(f"ap_parameters_{host}.json") return payload.get(name) # iNav surfaces def observe_inav_tcp_handshake(host: str, port: int, timeout_s: float) -> TcpHandshakeReport: """Return the recorded TCP handshake outcome for ``(host, port)``. Fixture: ``${E2E_SITL_REPLAY_DIR}/inav_handshake__.json``. Raises ``RuntimeError`` on missing fixture. ``timeout_s`` is recorded for future live-mode use but ignored here. """ if timeout_s <= 0: raise RuntimeError( f"observe_inav_tcp_handshake: timeout_s must be positive; got {timeout_s}" ) payload, path = _load_required_json(f"inav_handshake_{host}_{port}.json") raw = payload.get("established_within_s") if raw is not None and not isinstance(raw, (int, float)): raise RuntimeError( f"sitl_observer inav handshake fixture {path}: " f"`established_within_s` must be a number or null; got {type(raw).__name__}" ) return TcpHandshakeReport(established_within_s=float(raw) if raw is not None else None) def collect_inav_msp_frames(host: str, port: int, window_s: float) -> MspFrameCapture: """Return the recorded MSP frame window for ``(host, port)``. Fixture: ``${E2E_SITL_REPLAY_DIR}/inav_msp_frames__.json`` with shape ``{frames: [{monotonic_ms, function_id}, ...], expected_num_sat: int}``. Raises ``RuntimeError`` if env var unset or fixture missing. """ if window_s <= 0: raise RuntimeError( f"collect_inav_msp_frames: window_s must be positive; got {window_s}" ) payload, path = _load_required_json(f"inav_msp_frames_{host}_{port}.json") raw_frames = payload.get("frames", []) if not isinstance(raw_frames, list): raise RuntimeError( f"sitl_observer inav msp frames fixture {path}: `frames` must be a list" ) frames: list[MspFrameSample] = [] for item in raw_frames: try: frames.append( MspFrameSample( monotonic_ms=int(item["monotonic_ms"]), function_id=int(item["function_id"]), ) ) except (KeyError, TypeError, ValueError) as exc: raise RuntimeError( f"sitl_observer inav msp frames fixture {path}: malformed frame: {exc}" ) from exc expected_num_sat = payload.get("expected_num_sat") if not isinstance(expected_num_sat, int): raise RuntimeError( f"sitl_observer inav msp frames fixture {path}: " f"`expected_num_sat` must be an int; got {type(expected_num_sat).__name__}" ) return MspFrameCapture(frames=frames, expected_num_sat=expected_num_sat) def query_inav_gps_state(host: str) -> InavGpsState: """Return the recorded iNav GPS state snapshot for ``host``. Fixture: ``${E2E_SITL_REPLAY_DIR}/inav_gps_state_.json``. Raises ``RuntimeError`` if env var unset or fixture missing. """ payload, path = _load_required_json(f"inav_gps_state_{host}.json") try: return InavGpsState( fix_type=int(payload["fix_type"]), num_sat=int(payload["num_sat"]), provider=str(payload["provider"]), ) except (KeyError, TypeError, ValueError) as exc: raise RuntimeError( f"sitl_observer iNav GPS state fixture {path} malformed: {exc}" ) from exc