[AZ-399] [AZ-400] C8 TlogReplayFcAdapter + ReplaySink + JsonlReplaySink

Opens E-DEMO-REPLAY (AZ-265): the two C8 strategies that let the
upcoming compose_replay (AZ-401) and gps-denied-replay CLI (AZ-402)
run the production C1-C5 pipeline against a recorded (.tlog, video)
pair without touching live FC I/O.

AZ-400 lands the contract ReplaySink Protocol (emit + close per
replay_protocol.md v1.0.0) and JsonlReplaySink: orjson-serialised
JSONL, fsync-on-close, build-flag gated (BUILD_REPLAY_SINK_JSONL),
double-close idempotent, FDR mirror on open/close. The drifted
AZ-390 stub in interface.py is removed; the canonical Protocol now
lives in replay_sink.py per module-layout.md and is re-exported via
__init__.py. AZ-390 conformance test widened.

AZ-399 lands TlogReplayFcAdapter: full FcAdapter Protocol surface,
build-flag gated (BUILD_TLOG_REPLAY_ADAPTER), pymavlink stream-parse
with bounded pre-scan + fail-fast on missing required messages
(R-DEMO-3), dedicated decode thread feeding the existing AZ-391
SubscriptionBus. Outbound surface raises FcEmitError per Invariant 5;
request_source_set_switch raises SourceSetSwitchNotSupportedError.
Pacing honours Invariant 6 via Clock.sleep_until_ns. time_offset_ms
shifts every emitted received_at per Invariant 8. Non-monotonic
timestamps raise FcOpenError.

Test coverage: 188 c8_fc_adapter tests pass; 1 skipped (AZ-399 AC-1
500 MB tlog RSS bound, deferred to AZ-404 e2e behind RUN_REPLAY_E2E).
Code review: PASS_WITH_WARNINGS — 1 Medium (mapping logic duplicates
AZ-391 live decoder; intentional today, four behavioural deltas
documented), 2 Low.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-14 05:33:20 +03:00
parent 4eac24f37a
commit fa3742d582
13 changed files with 2688 additions and 23 deletions
@@ -1,10 +1,16 @@
"""C8 FC + GCS Adapter component — Public API (AZ-390 / E-C8)."""
"""C8 FC + GCS Adapter component — Public API (AZ-390 / E-C8 + AZ-400 sink).
The ``ReplaySink`` Protocol is owned by :mod:`replay_sink` per the
replay contract (``_docs/02_document/contracts/replay/replay_protocol.md``);
it is re-exported here so consumers continue to see a single Public-API
surface for the C8 component.
"""
from gps_denied_onboard._types.emitted import EmittedExternalPosition
from gps_denied_onboard.components.c8_fc_adapter.interface import (
FcAdapter,
GcsAdapter,
ReplaySink,
)
from gps_denied_onboard.components.c8_fc_adapter.replay_sink import ReplaySink
__all__ = ["EmittedExternalPosition", "FcAdapter", "GcsAdapter", "ReplaySink"]
@@ -7,9 +7,13 @@ Concrete strategies (linked at build time per ADR-002):
Replay extensions (`TlogReplayFcAdapter`, `JsonlReplaySink`) implement
the same Protocols and live under separate build flags (E-DEMO-REPLAY).
The `ReplaySink` Protocol itself lives in :mod:`replay_sink` per the
contract module-layout home — this file owns the live FC + GCS
Protocols only.
Public-API restriction: only `FcAdapter`, `GcsAdapter`, `ReplaySink`,
plus the contract DTOs in `_types/fc.py` and `_types/emitted.py`.
Public-API restriction: `FcAdapter` and `GcsAdapter` here, plus the
`ReplaySink` Protocol re-exported from :mod:`replay_sink`, plus the
contract DTOs in `_types/fc.py` and `_types/emitted.py`.
"""
from __future__ import annotations
@@ -27,7 +31,7 @@ from gps_denied_onboard._types.fc import (
)
from gps_denied_onboard._types.state import EstimatorOutput
__all__ = ["FcAdapter", "GcsAdapter", "ReplaySink"]
__all__ = ["FcAdapter", "GcsAdapter"]
@runtime_checkable
@@ -70,16 +74,3 @@ class GcsAdapter(Protocol):
def subscribe_operator_commands(self, callback: OperatorCommandCallback) -> Subscription: ...
def emit_status_text(self, msg: str, severity: Severity) -> None: ...
@runtime_checkable
class ReplaySink(Protocol):
"""Replay-mode estimate sink (e.g. JSONL writer).
Lives in the same module so the replay binary's composition root
can wire `JsonlReplaySink` alongside the production adapters.
Excluded from `__init__.__all__` in production-only builds via the
`BUILD_REPLAY_SINK_JSONL` flag.
"""
def write(self, output: EstimatorOutput) -> None: ...
@@ -0,0 +1,367 @@
"""``ReplaySink`` Protocol + ``JsonlReplaySink`` (AZ-400 / E-DEMO-REPLAY).
Owned per ``module-layout.md`` and the replay contract
(``_docs/02_document/contracts/replay/replay_protocol.md`` v1.0.0).
The Protocol replaces the placeholder shape declared in the AZ-390
stub: the contract specifies ``emit(EstimatorOutput) -> None`` plus
``close() -> None`` (with ``fsync``-on-close as the durability gate),
not the single-method ``write(...)`` the stub had.
Build-time gating: the implementation refuses construction unless
``BUILD_REPLAY_SINK_JSONL`` is ON in the environment (Invariant 9 of
the replay contract / ADR-002). Only the ``replay-cli`` binary is
expected to flip the flag ON; airborne / research / operator binaries
keep it OFF.
Serialisation rules (Invariant 7 + AC-3..AC-5 of AZ-400):
- one JSON object per call to :meth:`JsonlReplaySink.emit`, terminated
with a single ``\\n``;
- numpy ``covariance_6x6`` is row-major flattened to 36 floats — the
spec asks for a flat list, not the orjson default nested-list shape;
- :class:`PoseSourceLabel` enum members are serialised as their
``.name`` string (``"SATELLITE_ANCHORED"``, etc.);
- :class:`UUID` ``frame_id`` is serialised as its canonical string form;
- frozen-dataclass nested DTOs (:class:`LatLonAlt`, :class:`Quat`) are
exploded into plain dicts with their dataclass field keys.
The choice of building the JSON-friendly dict explicitly (rather than
relying on :func:`dataclasses.asdict`) keeps the per-field shape
tightly bound to the AC matrix: ``asdict`` does not understand enums or
numpy arrays, and the AC-4 flat-list requirement is incompatible with
``orjson.OPT_SERIALIZE_NUMPY``'s default 2-D output.
"""
from __future__ import annotations
import os
import threading
from pathlib import Path
from typing import TYPE_CHECKING, Any, Final, Protocol, runtime_checkable
import numpy as np
import orjson
from gps_denied_onboard.fdr_client.records import FdrRecord
from gps_denied_onboard.helpers.iso_timestamps import iso_ts_now
from gps_denied_onboard.logging import get_logger
if TYPE_CHECKING:
from gps_denied_onboard._types.state import EstimatorOutput
from gps_denied_onboard.fdr_client.client import FdrClient
__all__ = [
"JsonlReplaySink",
"ReplaySink",
"ReplaySinkConfigError",
"ReplaySinkError",
"create",
]
_BUILD_FLAG: Final[str] = "BUILD_REPLAY_SINK_JSONL"
_FDR_PRODUCER_ID: Final[str] = "c8_fc_adapter.replay_sink"
_LOG_KIND_OPENED: Final[str] = "replay.sink.opened"
_LOG_KIND_CLOSED: Final[str] = "replay.sink.closed"
_LOG_KIND_EMIT_PROGRESS: Final[str] = "replay.sink.emit_progress"
_LOG_KIND_DOUBLE_CLOSE: Final[str] = "replay.sink.double_close"
_EMIT_PROGRESS_INTERVAL: Final[int] = 1000
class ReplaySinkError(RuntimeError):
"""Base class for runtime ``ReplaySink`` failures.
Raised on construction-time validation (parent dir missing) and on
write-side OS errors. Distinct from :class:`ReplaySinkConfigError`
so callers can opt to catch only build/wiring failures separately.
"""
class ReplaySinkConfigError(ReplaySinkError):
"""Configuration / build-flag failure (sink unavailable in this binary)."""
@runtime_checkable
class ReplaySink(Protocol):
"""Replay-mode :class:`EstimatorOutput` sink.
Concrete implementations must support :meth:`emit` (write one
record) plus :meth:`close` (flush + durably persist). The contract
permits at most one ``close`` call per real handle; subsequent
calls are no-ops (Invariant: idempotent close).
"""
def emit(self, output: "EstimatorOutput") -> None: ...
def close(self) -> None: ...
def _build_flag_on() -> bool:
"""Return ``True`` when ``BUILD_REPLAY_SINK_JSONL`` is set to a truthy token."""
raw = os.environ.get(_BUILD_FLAG, "")
return raw.strip().lower() in {"on", "1", "true", "yes"}
def _to_jsonable(output: "EstimatorOutput") -> dict[str, Any]:
"""Convert ``EstimatorOutput`` into the JSONL on-wire shape.
The shape mirrors :class:`EstimatorOutput.__dataclass_fields__` 1:1
so AC-3 holds. UUID, enum, numpy, and nested-DTO fields are
converted explicitly per AC-4 / AC-5.
"""
cov = np.asarray(output.covariance_6x6, dtype=np.float64)
return {
"frame_id": str(output.frame_id),
"position_wgs84": {
"lat_deg": output.position_wgs84.lat_deg,
"lon_deg": output.position_wgs84.lon_deg,
"alt_m": output.position_wgs84.alt_m,
},
"orientation_world_T_body": {
"w": output.orientation_world_T_body.w,
"x": output.orientation_world_T_body.x,
"y": output.orientation_world_T_body.y,
"z": output.orientation_world_T_body.z,
},
"velocity_world_mps": list(output.velocity_world_mps),
"covariance_6x6": cov.flatten().tolist(),
"source_label": output.source_label.name,
"last_satellite_anchor_age_ms": int(output.last_satellite_anchor_age_ms),
"smoothed": bool(output.smoothed),
"emitted_at": int(output.emitted_at),
}
class JsonlReplaySink:
"""JSONL-backed :class:`ReplaySink` implementation (AZ-400).
Writes one orjson-serialised line per :meth:`emit` call. The file
is opened with ``buffering=0`` so :meth:`emit` returns only after
the bytes have crossed into the kernel buffer; :meth:`close`
invokes ``fsync`` for the on-disk durability guarantee per
Invariant 7 / AC-7.
Construction is single-shot — calling :meth:`close` twice is safe
(the second call is no-op'd and a DEBUG log fires per AC-8).
Thread-safety: :meth:`emit` and :meth:`close` are guarded by a
lock so concurrent emits remain line-atomic. Replay's runtime
loop is single-threaded, but the lock costs ~100 ns and prevents
test-side surprises.
"""
__slots__ = (
"_output_path",
"_fdr_client",
"_log",
"_fileobj",
"_lock",
"_lines_written",
"_closed",
)
def __init__(
self,
output_path: Path,
fdr_client: "FdrClient",
) -> None:
if not _build_flag_on():
raise ReplaySinkConfigError(
f"{_BUILD_FLAG} is OFF in this binary; JsonlReplaySink is "
"unavailable. Rebuild with the flag set to ON in the "
"replay binary's Dockerfile."
)
if not isinstance(output_path, Path):
raise ReplaySinkError(
f"output_path must be a pathlib.Path; got {type(output_path).__name__}"
)
parent = output_path.parent
if not parent.is_dir():
raise ReplaySinkError(
f"output parent directory does not exist: {parent}"
)
try:
fileobj = open(output_path, "wb", buffering=0) # noqa: SIM115 — owned for the sink lifetime
except OSError as exc:
raise ReplaySinkError(
f"failed to open output file {output_path}: {exc!r}"
) from exc
self._output_path = output_path
self._fdr_client = fdr_client
self._log = get_logger("c8_fc_adapter.replay_sink")
self._fileobj = fileobj
self._lock = threading.Lock()
self._lines_written = 0
self._closed = False
self._log.info(
f"{_LOG_KIND_OPENED}: output_path={output_path}",
extra={
"kind": _LOG_KIND_OPENED,
"kv": {"output_path": str(output_path)},
},
)
self._emit_fdr_event(
log_kind=_LOG_KIND_OPENED,
level="INFO",
msg=f"replay sink opened: {output_path}",
kv={"output_path": str(output_path)},
)
def emit(self, output: "EstimatorOutput") -> None:
"""Write a single ``EstimatorOutput`` as one JSONL line.
Raises :class:`ReplaySinkError` on a closed sink or on an
underlying OS write failure. orjson encode errors propagate
wrapped so the caller does not need to depend on the orjson
module.
"""
with self._lock:
if self._closed:
raise ReplaySinkError("emit on closed JsonlReplaySink")
try:
payload = _to_jsonable(output)
line = orjson.dumps(payload) + b"\n"
except (TypeError, ValueError, orjson.JSONEncodeError) as exc:
raise ReplaySinkError(
f"failed to serialise EstimatorOutput to JSONL: {exc!r}"
) from exc
try:
self._fileobj.write(line)
except OSError as exc:
raise ReplaySinkError(
f"write failed on {self._output_path}: {exc!r}"
) from exc
self._lines_written += 1
if self._lines_written % _EMIT_PROGRESS_INTERVAL == 0:
self._log.debug(
f"{_LOG_KIND_EMIT_PROGRESS}: lines_written={self._lines_written}",
extra={
"kind": _LOG_KIND_EMIT_PROGRESS,
"kv": {"lines_written": self._lines_written},
},
)
def close(self) -> None:
"""Flush + ``fsync`` the underlying file then mark the sink closed.
Idempotent: a second call is no-op'd and a DEBUG record is
emitted (AC-8). The file handle is released even if ``fsync``
raises so a hung kernel does not leak a descriptor.
"""
with self._lock:
if self._closed:
self._log.debug(
_LOG_KIND_DOUBLE_CLOSE,
extra={
"kind": _LOG_KIND_DOUBLE_CLOSE,
"kv": {"output_path": str(self._output_path)},
},
)
return
self._closed = True
lines_written = self._lines_written
try:
try:
os.fsync(self._fileobj.fileno())
except OSError as exc:
self._log.warning(
f"replay.sink.fsync_failed: {exc!r}",
extra={
"kind": "replay.sink.fsync_failed",
"kv": {
"output_path": str(self._output_path),
"error": repr(exc),
},
},
)
finally:
try:
self._fileobj.close()
except OSError as exc:
self._log.warning(
f"replay.sink.close_failed: {exc!r}",
extra={
"kind": "replay.sink.close_failed",
"kv": {
"output_path": str(self._output_path),
"error": repr(exc),
},
},
)
self._log.info(
f"{_LOG_KIND_CLOSED}: output_path={self._output_path} "
f"lines_written={lines_written}",
extra={
"kind": _LOG_KIND_CLOSED,
"kv": {
"output_path": str(self._output_path),
"lines_written": lines_written,
},
},
)
self._emit_fdr_event(
log_kind=_LOG_KIND_CLOSED,
level="INFO",
msg=f"replay sink closed: {self._output_path}",
kv={
"output_path": str(self._output_path),
"lines_written": lines_written,
},
)
@property
def lines_written(self) -> int:
"""Total successful :meth:`emit` calls so far (test/debug accessor)."""
with self._lock:
return self._lines_written
def _emit_fdr_event(
self,
*,
log_kind: str,
level: str,
msg: str,
kv: dict[str, Any],
) -> None:
"""Mirror an open/close lifecycle event into FDR as a ``log`` record.
Mirrors the ``_fdr_signing_event`` pattern in the AP outbound
adapter so post-flight forensics see the same event surface as
the structured log. Failures here are deliberately non-fatal
— the structured log is the canonical surface, FDR is the
forensics replay layer.
"""
record = FdrRecord(
schema_version=1,
ts=iso_ts_now(),
producer_id=_FDR_PRODUCER_ID,
kind="log",
payload={
"level": level,
"component": "c8_fc_adapter",
"kind": log_kind,
"msg": msg,
"kv": kv,
},
)
try:
self._fdr_client.enqueue(record)
except Exception as exc:
self._log.debug(
f"replay.sink.fdr_enqueue_failed: {exc!r}",
extra={
"kind": "replay.sink.fdr_enqueue_failed",
"kv": {"error": repr(exc), "downstream_kind": log_kind},
},
)
def create(*, output_path: Path, fdr_client: "FdrClient") -> JsonlReplaySink:
"""Module-level factory entrypoint per project convention.
Mirrors the ``create`` factories used by the C2/C3 strategies so
the AZ-401 ``compose_replay`` wiring resolves the sink through a
single named-symbol contract instead of poking at the class
constructor directly.
"""
return JsonlReplaySink(output_path=output_path, fdr_client=fdr_client)
@@ -0,0 +1,749 @@
"""``TlogReplayFcAdapter`` (AZ-399 / E-DEMO-REPLAY).
Replay-only :class:`FcAdapter` strategy parsing pymavlink ``.tlog``
files. Implements the full Protocol from
``_docs/02_document/contracts/c8_fc_adapter/fc_adapter_protocol.md``
plus the replay-specific Invariants 5, 6, 8 from
``_docs/02_document/contracts/replay/replay_protocol.md`` (no
out-bound emission, pace honoured by injected :class:`Clock`,
``time_offset_ms`` shift baked at construction).
Build-time gating: the adapter refuses construction unless
``BUILD_TLOG_REPLAY_ADAPTER`` is ``ON``. Only the ``replay-cli``
binary is expected to flip the flag ON; airborne / research /
operator binaries keep it OFF.
Stream-parse design: pymavlink's :class:`mavutil.mavlogfile` already
streams from disk via :meth:`recv_match`. The adapter wraps it in a
pre-scan pass (fail-fast on missing required message types per
R-DEMO-3) followed by a dedicated decode thread that fans messages
out to subscribers via the AZ-391 :class:`SubscriptionBus` (so live
and replay consumers see the same fan-out shape; Invariant 1).
Timestamps: ``FcTelemetryFrame.received_at`` is the **tlog** message
timestamp (``msg._timestamp`` × 1e9, normalised to ns), shifted by
``time_offset_ms`` × 1e6, NOT the wall clock. The injected
:class:`Clock` controls only pacing — when ``pace=ReplayPace.REALTIME``
the decode thread calls :meth:`Clock.sleep_until_ns` between frames;
when ``pace=ReplayPace.ASAP`` the call is a no-op. This keeps
downstream timing logic (smoothing windows, FDR rolling cursors)
deterministic across pace settings.
"""
from __future__ import annotations
import os
import threading
from collections import Counter
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Any, Final
from gps_denied_onboard._types.fc import (
AttitudeSample,
FcKind,
FcTelemetryFrame,
FlightState,
FlightStateSignal,
GpsHealth,
GpsStatus,
ImuTelemetrySample,
PortConfig,
Severity,
Subscription,
TelemetryCallback,
TelemetryKind,
)
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard.components.c8_fc_adapter._subscription import SubscriptionBus
from gps_denied_onboard.components.c8_fc_adapter.errors import (
FcAdapterConfigError,
FcEmitError,
FcOpenError,
SourceSetSwitchNotSupportedError,
)
from gps_denied_onboard.fdr_client.records import FdrRecord
from gps_denied_onboard.helpers.iso_timestamps import iso_ts_now
from gps_denied_onboard.logging import get_logger
if TYPE_CHECKING:
from gps_denied_onboard._types.emitted import EmittedExternalPosition
from gps_denied_onboard._types.state import EstimatorOutput
from gps_denied_onboard.clock import Clock
from gps_denied_onboard.fdr_client.client import FdrClient
from gps_denied_onboard.helpers.wgs_converter import WgsConverter
__all__ = [
"REQUIRED_MESSAGE_TYPES",
"ReplayPace",
"TlogReplayFcAdapter",
]
_BUILD_FLAG: Final[str] = "BUILD_TLOG_REPLAY_ADAPTER"
_FDR_PRODUCER_ID: Final[str] = "c8_fc_adapter.tlog_replay_adapter"
_DECODE_THREAD_NAME: Final[str] = "c8.tlog_replay.decode"
_FRAME_PROGRESS_INTERVAL: Final[int] = 1000
_LOG_KIND_OPENED: Final[str] = "c8.tlog_replay.opened"
_LOG_KIND_MISSING_MESSAGES: Final[str] = "c8.tlog_replay.missing_messages"
_LOG_KIND_FRAME_PROGRESS: Final[str] = "c8.tlog_replay.frame_progress"
_LOG_KIND_NON_MONOTONIC: Final[str] = "c8.tlog_replay.non_monotonic_timestamp"
_LOG_KIND_DECODE_ERROR: Final[str] = "c8.tlog_replay.decode_error"
# Per R-DEMO-3, these five MAVLink message types are the minimum the
# replay binary needs to feed C1 + C5 with everything the live AP
# adapter delivers (IMU + attitude + GPS-health + flight state). The
# IMU pair is OR'd together (RAW_IMU OR SCALED_IMU2 satisfies it); the
# GPS pair is OR'd similarly (GPS_RAW_INT OR GPS2_RAW satisfies it).
_REQUIRED_MESSAGE_GROUPS: Final[tuple[tuple[str, ...], ...]] = (
("RAW_IMU", "SCALED_IMU2"),
("ATTITUDE",),
("GPS_RAW_INT", "GPS2_RAW"),
("HEARTBEAT",),
)
# Maps each required group to the downstream consumers that depend
# on it. Used in the fail-fast error message so the operator knows
# which component will starve if the tlog is missing the type.
_GROUP_CONSUMERS: Final[dict[tuple[str, ...], tuple[str, ...]]] = {
("RAW_IMU", "SCALED_IMU2"): ("C1 VIO", "C5 StateEstimator"),
("ATTITUDE",): ("C1 VIO",),
("GPS_RAW_INT", "GPS2_RAW"): ("C5 StateEstimator", "C8 spoof-recovery"),
("HEARTBEAT",): ("C5 StateEstimator", "C8 emit gate"),
}
REQUIRED_MESSAGE_TYPES: Final[tuple[str, ...]] = tuple(
msg for group in _REQUIRED_MESSAGE_GROUPS for msg in group
) + ("STATUSTEXT",)
# Pre-scan budget: ~30 s of telemetry at 200 Hz = 6 000 messages. The
# tlog header carries HEARTBEAT + GPS_RAW_INT in the first second, so
# this is a generous ceiling that still fails fast on truly empty
# tlogs without scanning multi-GB files end-to-end.
_PRESCAN_MAX_MESSAGES: Final[int] = 6000
# MAVLink GPS_FIX_TYPE enum values (subset we map; mirrors AZ-391).
_FIX_TYPE_NO_FIX_OR_NONE: Final[frozenset[int]] = frozenset({0, 1})
_FIX_TYPE_2D: Final[int] = 2
# MAV_STATE values + base_mode flag (mirrors AZ-391's mapping).
_MAV_STATE_UNINIT: Final[int] = 0
_MAV_STATE_BOOT: Final[int] = 1
_MAV_STATE_CALIBRATING: Final[int] = 2
_MAV_STATE_STANDBY: Final[int] = 3
_MAV_STATE_ACTIVE: Final[int] = 4
_MAV_STATE_CRITICAL: Final[int] = 5
_MAV_STATE_EMERGENCY: Final[int] = 6
_MAV_STATE_POWEROFF: Final[int] = 7
_MAV_STATE_FLIGHT_TERMINATION: Final[int] = 8
_MAV_MODE_FLAG_SAFETY_ARMED: Final[int] = 0x80
class ReplayPace(Enum):
"""Replay timing strategy honoured by the injected :class:`Clock`.
``REALTIME`` makes the decode thread sleep between tlog frames so
the runtime loop runs at recorded cadence (live-equivalent for
UI demos). ``ASAP`` skips the sleep — the runtime loop consumes
the tlog as fast as the consumer chain allows (≥ 5× real-time on
Tier-1 hardware per the AZ-265 NFT).
"""
REALTIME = "realtime"
ASAP = "asap"
def _build_flag_on() -> bool:
"""Return ``True`` when ``BUILD_TLOG_REPLAY_ADAPTER`` is a truthy token."""
raw = os.environ.get(_BUILD_FLAG, "")
return raw.strip().lower() in {"on", "1", "true", "yes"}
def _msg_timestamp_ns(msg: Any) -> int:
"""Extract the tlog wall-clock timestamp in ns from a pymavlink msg.
pymavlink decorates every record from a ``mavlogfile`` with a
``_timestamp`` attribute (Unix epoch float seconds with ms-class
resolution). Tests inject the field directly on a SimpleNamespace
so the math here remains a pure function.
"""
raw = getattr(msg, "_timestamp", None)
if raw is None:
raise FcOpenError(
"tlog message missing _timestamp attribute; pymavlink mavlogfile "
"should populate it on every recv_match() return"
)
return int(float(raw) * 1_000_000_000)
class TlogReplayFcAdapter:
"""Replay :class:`FcAdapter` driven by a pymavlink ``.tlog`` stream.
The adapter implements the full :class:`FcAdapter` Protocol so
the C1 + C5 consumers see live-identical wiring. Outbound methods
raise per Invariant 5 (replay is read-only on the FC side; the
runtime loop emits to :class:`ReplaySink` instead).
"""
__slots__ = (
"_tlog_path",
"_target_fc_dialect",
"_clock",
"_wgs_converter",
"_time_offset_ns",
"_pace",
"_fdr_client",
"_log",
"_bus",
"_source",
"_source_factory",
"_decode_thread",
"_stop_flag",
"_opened",
"_closed",
"_lock",
"_warm_start_hint",
"_warm_start_hint_at",
"_latest_flight_state",
"_last_received_at_ns",
"_dispatched_count",
)
def __init__(
self,
*,
tlog_path: Path,
target_fc_dialect: FcKind,
clock: "Clock",
wgs_converter: "WgsConverter",
fdr_client: "FdrClient",
time_offset_ms: int = 0,
pace: ReplayPace = ReplayPace.ASAP,
source_factory: Any | None = None,
) -> None:
if not _build_flag_on():
raise FcAdapterConfigError(
f"{_BUILD_FLAG} is OFF in this binary; "
"TlogReplayFcAdapter is unavailable. Rebuild with the "
"flag set to ON in the replay binary's Dockerfile."
)
if not isinstance(tlog_path, Path):
raise FcAdapterConfigError(
f"tlog_path must be a pathlib.Path; got {type(tlog_path).__name__}"
)
if target_fc_dialect not in (FcKind.ARDUPILOT_PLANE, FcKind.INAV):
raise FcAdapterConfigError(
f"target_fc_dialect must be ARDUPILOT_PLANE or INAV; "
f"got {target_fc_dialect!r}"
)
self._tlog_path = tlog_path
self._target_fc_dialect = target_fc_dialect
self._clock = clock
self._wgs_converter = wgs_converter
self._fdr_client = fdr_client
self._time_offset_ns: int = int(time_offset_ms) * 1_000_000
self._pace = pace
self._log = get_logger("c8_fc_adapter.tlog_replay")
self._bus = SubscriptionBus()
self._source: Any = None
self._source_factory = source_factory
self._decode_thread: threading.Thread | None = None
self._stop_flag = threading.Event()
self._opened = False
self._closed = False
self._lock = threading.Lock()
self._warm_start_hint: LatLonAlt | None = None
self._warm_start_hint_at: int | None = None
self._latest_flight_state: FlightStateSignal | None = None
self._last_received_at_ns: int = -1
self._dispatched_count: int = 0
# ------------------------------------------------------------------
# FcAdapter Protocol implementation
def open(
self,
port: PortConfig | None = None,
signing_key: bytes | None = None,
) -> None:
"""Open the tlog, validate required messages, start the decode thread.
``port`` and ``signing_key`` are accepted for Protocol parity
but unused (replay has no FC link to open and no signing
handshake to perform). The pre-scan pass walks the first
:data:`_PRESCAN_MAX_MESSAGES` records, asserts every required
message group is represented at least once, and then re-opens
the file for the streaming decode pass.
"""
if self._opened:
raise FcOpenError("TlogReplayFcAdapter already opened")
if not self._tlog_path.is_file():
raise FcOpenError(f"tlog file not found: {self._tlog_path}")
message_counts = self._prescan_required_messages()
self._source = self._open_mavlog()
thread = threading.Thread(
target=self._run_decode_loop,
name=_DECODE_THREAD_NAME,
daemon=True,
)
self._decode_thread = thread
self._opened = True
self._log.info(
f"{_LOG_KIND_OPENED}: tlog_path={self._tlog_path} "
f"dialect={self._target_fc_dialect.value} "
f"time_offset_ms={self._time_offset_ns // 1_000_000} "
f"pace={self._pace.value}",
extra={
"kind": _LOG_KIND_OPENED,
"kv": {
"tlog_path": str(self._tlog_path),
"target_fc_dialect": self._target_fc_dialect.value,
"time_offset_ms": self._time_offset_ns // 1_000_000,
"pace": self._pace.value,
"message_counts": dict(message_counts),
},
},
)
self._emit_fdr_event(
log_kind=_LOG_KIND_OPENED,
level="INFO",
msg=f"tlog replay opened: {self._tlog_path}",
kv={
"tlog_path": str(self._tlog_path),
"target_fc_dialect": self._target_fc_dialect.value,
"time_offset_ms": self._time_offset_ns // 1_000_000,
"pace": self._pace.value,
},
)
thread.start()
def close(self) -> None:
"""Stop the decode thread and release the tlog file handle."""
if not self._opened or self._closed:
return
self._closed = True
self._stop_flag.set()
if self._decode_thread is not None and self._decode_thread.is_alive():
self._decode_thread.join(timeout=5.0)
if self._source is not None and hasattr(self._source, "close"):
try:
self._source.close()
except Exception as exc: # pragma: no cover — defensive.
self._log.debug(
f"c8.tlog_replay.source_close_failed: {exc!r}",
extra={
"kind": "c8.tlog_replay.source_close_failed",
"kv": {"error": repr(exc)},
},
)
self._source = None
def subscribe_telemetry(self, callback: TelemetryCallback) -> Subscription:
return self._bus.subscribe(callback)
def emit_external_position(self, output: "EstimatorOutput") -> "EmittedExternalPosition":
# Invariant 5: replay never writes to the FC.
raise FcEmitError("replay adapter does not emit to FC")
def emit_status_text(self, msg: str, severity: Severity) -> None:
# Invariant 5: replay never writes to the FC.
raise FcEmitError("replay adapter does not emit to FC")
def request_source_set_switch(self) -> None:
raise SourceSetSwitchNotSupportedError(
"TlogReplayFcAdapter cannot issue MAV_CMD_SET_EKF_SOURCE_SET; "
"replay reads telemetry from a recorded file"
)
def current_flight_state(self) -> FlightStateSignal:
with self._lock:
latest = self._latest_flight_state
if latest is not None:
return latest
return FlightStateSignal(
state=FlightState.INIT,
last_valid_gps_hint_wgs84=None,
last_valid_gps_age_ms=None,
captured_at=self._clock.monotonic_ns(),
)
# ------------------------------------------------------------------
# Pre-scan and source open
def _prescan_required_messages(self) -> Counter[str]:
"""Walk the head of the tlog and assert every required group is present.
Closes the pre-scan handle before returning. The streaming
decode pass opens a fresh handle so the file pointer is at
position zero.
"""
scan_source = self._open_mavlog()
seen: Counter[str] = Counter()
try:
try:
for _ in range(_PRESCAN_MAX_MESSAGES):
msg = scan_source.recv_match(
type=list(REQUIRED_MESSAGE_TYPES),
blocking=False,
)
if msg is None:
break
msg_type = self._safe_msg_type(msg)
if msg_type:
seen[msg_type] += 1
if self._all_groups_satisfied(seen):
break
except Exception as exc:
raise FcOpenError(
f"tlog pre-scan failed on {self._tlog_path}: {exc!r}"
) from exc
finally:
if hasattr(scan_source, "close"):
try:
scan_source.close()
except Exception: # pragma: no cover — defensive.
pass
missing = [
group for group in _REQUIRED_MESSAGE_GROUPS
if not any(seen.get(name, 0) > 0 for name in group)
]
if missing:
self._raise_missing_messages(missing)
return seen
def _open_mavlog(self) -> Any:
"""Open the tlog via the configured source factory or pymavlink."""
if self._source_factory is not None:
return self._source_factory(str(self._tlog_path))
# Lazy import so test paths that pass a ``source_factory`` do
# not require pymavlink at module-import time.
try:
from pymavlink import mavutil # type: ignore[import-not-found]
except ImportError as exc:
raise FcOpenError(
"pymavlink is required for TlogReplayFcAdapter but is not "
"importable in this binary"
) from exc
# The dialect string must match pymavlink's bundled dialect
# name. ArduPilot Plane uses the ``ardupilotmega`` dialect;
# iNav telemetry rides the AP MAVLink dialect per
# RESTRICT-COMM-2 (the iNav-side adapter is MSP2; here we are
# parsing the GCS telemetry channel that always speaks
# MAVLink).
dialect = "ardupilotmega"
return mavutil.mavlink_connection(
str(self._tlog_path),
dialect=dialect,
mavlink_version="2.0",
)
def _all_groups_satisfied(self, seen: Counter[str]) -> bool:
return all(any(seen.get(name, 0) > 0 for name in group) for group in _REQUIRED_MESSAGE_GROUPS)
def _raise_missing_messages(self, missing: list[tuple[str, ...]]) -> None:
rendered_missing = [list(group) for group in missing]
consumers: list[str] = []
for group in missing:
consumers.extend(_GROUP_CONSUMERS.get(group, ()))
# De-duplicate while preserving order.
seen: set[str] = set()
consumer_list = [c for c in consumers if not (c in seen or seen.add(c))]
message = (
f"tlog missing required messages: {rendered_missing}; "
f"consumed by: {consumer_list}"
)
self._log.error(
f"{_LOG_KIND_MISSING_MESSAGES}: {message}",
extra={
"kind": _LOG_KIND_MISSING_MESSAGES,
"kv": {
"missing_groups": rendered_missing,
"consumers": consumer_list,
"tlog_path": str(self._tlog_path),
},
},
)
self._emit_fdr_event(
log_kind=_LOG_KIND_MISSING_MESSAGES,
level="ERROR",
msg=message,
kv={
"missing_groups": rendered_missing,
"consumers": consumer_list,
"tlog_path": str(self._tlog_path),
},
)
raise FcOpenError(message)
@staticmethod
def _safe_msg_type(msg: Any) -> str:
try:
if hasattr(msg, "get_type"):
return str(msg.get_type())
except Exception:
return ""
return type(msg).__name__
# ------------------------------------------------------------------
# Decode loop
def _run_decode_loop(self) -> None:
try:
while not self._stop_flag.is_set():
if self._source is None:
return
try:
msg = self._source.recv_match(
type=list(REQUIRED_MESSAGE_TYPES),
blocking=False,
)
except Exception as exc:
self._log.warning(
f"{_LOG_KIND_DECODE_ERROR}: {exc!r}",
extra={
"kind": _LOG_KIND_DECODE_ERROR,
"kv": {"error": repr(exc)},
},
)
return
if msg is None:
# End of tlog.
return
self.feed_one_message(msg)
finally:
self._stop_flag.set()
def feed_one_message(self, msg: Any) -> bool:
"""Decode one tlog message and dispatch to subscribers if it maps.
Test-friendly entrypoint mirroring AZ-391's
:meth:`PymavlinkInboundDecoder.feed_one_message`. Production
replay uses :meth:`_run_decode_loop`.
"""
if msg is None:
return False
try:
msg_type = self._safe_msg_type(msg)
if msg_type in ("RAW_IMU", "SCALED_IMU2"):
return self._handle_imu(msg)
if msg_type == "ATTITUDE":
return self._handle_attitude(msg)
if msg_type in ("GPS_RAW_INT", "GPS2_RAW"):
return self._handle_gps(msg)
if msg_type == "HEARTBEAT":
return self._handle_heartbeat(msg)
if msg_type == "STATUSTEXT":
# Sentinel-only path for live; no replay-side spoof
# promotion needed (replay carries the recorded stream
# verbatim).
return False
except FcOpenError:
raise
except Exception as exc:
self._log.debug(
f"{_LOG_KIND_DECODE_ERROR}: msg_type={msg_type} {exc!r}",
extra={
"kind": _LOG_KIND_DECODE_ERROR,
"kv": {"msg_type": msg_type, "error": repr(exc)},
},
)
return False
return False
def _handle_imu(self, msg: Any) -> bool:
sensor_ts_ns = int(getattr(msg, "time_usec", 0)) * 1000
accel = (
float(msg.xacc),
float(msg.yacc),
float(msg.zacc),
)
gyro = (
float(msg.xgyro),
float(msg.ygyro),
float(msg.zgyro),
)
payload = ImuTelemetrySample(
ts_ns=sensor_ts_ns,
accel_xyz=accel,
gyro_xyz=gyro,
)
return self._dispatch(TelemetryKind.IMU_SAMPLE, payload, msg=msg)
def _handle_attitude(self, msg: Any) -> bool:
sensor_ts_ns = int(getattr(msg, "time_boot_ms", 0)) * 1_000_000
payload = AttitudeSample(
ts_ns=sensor_ts_ns,
roll_rad=float(msg.roll),
pitch_rad=float(msg.pitch),
yaw_rad=float(msg.yaw),
)
return self._dispatch(TelemetryKind.ATTITUDE, payload, msg=msg)
def _handle_gps(self, msg: Any) -> bool:
fix_type = int(getattr(msg, "fix_type", 0))
status = self._map_fix_type(fix_type)
captured_at = _msg_timestamp_ns(msg) + self._time_offset_ns
payload = GpsHealth(status=status, fix_age_ms=0, captured_at=captured_at)
# AC-5.1 warm-start hint cache (mirrors AZ-391 live path).
if fix_type >= 3:
lat_deg = int(getattr(msg, "lat", 0)) / 1e7
lon_deg = int(getattr(msg, "lon", 0)) / 1e7
alt_m = int(getattr(msg, "alt", 0)) / 1000.0
with self._lock:
if self._warm_start_hint is None:
self._warm_start_hint = LatLonAlt(lat_deg, lon_deg, alt_m)
self._warm_start_hint_at = captured_at
return self._dispatch(TelemetryKind.GPS_HEALTH, payload, msg=msg)
def _handle_heartbeat(self, msg: Any) -> bool:
captured_at = _msg_timestamp_ns(msg) + self._time_offset_ns
state = self._map_mav_state(
system_status=int(getattr(msg, "system_status", 0)),
base_mode=int(getattr(msg, "base_mode", 0)),
)
with self._lock:
hint = self._warm_start_hint
hint_at = self._warm_start_hint_at
last_age_ms: int | None = None
if hint_at is not None:
last_age_ms = max(0, (captured_at - hint_at) // 1_000_000)
payload = FlightStateSignal(
state=state,
last_valid_gps_hint_wgs84=hint,
last_valid_gps_age_ms=last_age_ms,
captured_at=captured_at,
)
with self._lock:
self._latest_flight_state = payload
return self._dispatch(TelemetryKind.MAV_STATE, payload, msg=msg)
def _dispatch(
self,
kind: TelemetryKind,
payload: ImuTelemetrySample | AttitudeSample | GpsHealth | FlightStateSignal,
*,
msg: Any,
) -> bool:
received_at = _msg_timestamp_ns(msg) + self._time_offset_ns
# Per Invariant 7 / contract Invariant 3: tlog timestamps must
# be non-decreasing. A backward step almost always indicates a
# corrupt or concatenated tlog; we raise so replay determinism
# is hard-failed (mirrors the AZ-398 TlogDerivedClock policy).
if self._last_received_at_ns >= 0 and received_at < self._last_received_at_ns:
self._log.error(
f"{_LOG_KIND_NON_MONOTONIC}: kind={kind.name} "
f"prev_ns={self._last_received_at_ns} this_ns={received_at}",
extra={
"kind": _LOG_KIND_NON_MONOTONIC,
"kv": {
"telemetry_kind": kind.name,
"prev_ns": self._last_received_at_ns,
"this_ns": received_at,
},
},
)
raise FcOpenError(
f"tlog non-monotonic timestamp at kind={kind.name}: "
f"{received_at} ns followed {self._last_received_at_ns} ns"
)
self._last_received_at_ns = received_at
if self._pace is ReplayPace.REALTIME:
try:
self._clock.sleep_until_ns(received_at)
except Exception as exc: # pragma: no cover — defensive.
self._log.debug(
f"c8.tlog_replay.clock_sleep_failed: {exc!r}",
extra={
"kind": "c8.tlog_replay.clock_sleep_failed",
"kv": {"error": repr(exc)},
},
)
# `signed=False` for replay: even if the source tlog carried
# signed AP frames, the replay binary cannot prove signature
# validity without the original key (D-CROSS-CVE-1 risk). The
# downstream consumers treat replay frames as the same
# provenance class as unsigned live frames.
frame = FcTelemetryFrame(
kind=kind,
payload=payload,
received_at=received_at,
signed=False,
)
self._bus.dispatch(frame)
self._dispatched_count += 1
if self._dispatched_count % _FRAME_PROGRESS_INTERVAL == 0:
self._log.debug(
f"{_LOG_KIND_FRAME_PROGRESS}: dispatched={self._dispatched_count}",
extra={
"kind": _LOG_KIND_FRAME_PROGRESS,
"kv": {"dispatched": self._dispatched_count},
},
)
return True
# ------------------------------------------------------------------
# Mapping helpers (mirror AZ-391 live decoder)
@staticmethod
def _map_fix_type(fix_type: int) -> GpsStatus:
if fix_type in _FIX_TYPE_NO_FIX_OR_NONE:
return GpsStatus.NO_FIX
if fix_type == _FIX_TYPE_2D:
return GpsStatus.DEGRADED
return GpsStatus.STABLE
@staticmethod
def _map_mav_state(*, system_status: int, base_mode: int) -> FlightState:
if system_status in (_MAV_STATE_UNINIT, _MAV_STATE_BOOT, _MAV_STATE_CALIBRATING):
return FlightState.INIT
if system_status in (
_MAV_STATE_CRITICAL,
_MAV_STATE_EMERGENCY,
_MAV_STATE_FLIGHT_TERMINATION,
):
return FlightState.FAILED
if system_status == _MAV_STATE_ACTIVE:
return FlightState.IN_FLIGHT
if system_status == _MAV_STATE_STANDBY:
if base_mode & _MAV_MODE_FLAG_SAFETY_ARMED:
return FlightState.ARMED
return FlightState.ON_GROUND
if system_status == _MAV_STATE_POWEROFF:
return FlightState.ON_GROUND
return FlightState.INIT
# ------------------------------------------------------------------
# FDR mirror (open / fail-fast events)
def _emit_fdr_event(
self,
*,
log_kind: str,
level: str,
msg: str,
kv: dict[str, Any],
) -> None:
record = FdrRecord(
schema_version=1,
ts=iso_ts_now(),
producer_id=_FDR_PRODUCER_ID,
kind="log",
payload={
"level": level,
"component": "c8_fc_adapter",
"kind": log_kind,
"msg": msg,
"kv": kv,
},
)
try:
self._fdr_client.enqueue(record)
except Exception as exc:
self._log.debug(
f"c8.tlog_replay.fdr_enqueue_failed: {exc!r}",
extra={
"kind": "c8.tlog_replay.fdr_enqueue_failed",
"kv": {"error": repr(exc), "downstream_kind": log_kind},
},
)