Files
Oleksandr Bezdieniezhnykh fa3742d582 [AZ-399] [AZ-400] C8 TlogReplayFcAdapter + ReplaySink + JsonlReplaySink
Opens E-DEMO-REPLAY (AZ-265): the two C8 strategies that let the
upcoming compose_replay (AZ-401) and gps-denied-replay CLI (AZ-402)
run the production C1-C5 pipeline against a recorded (.tlog, video)
pair without touching live FC I/O.

AZ-400 lands the contract ReplaySink Protocol (emit + close per
replay_protocol.md v1.0.0) and JsonlReplaySink: orjson-serialised
JSONL, fsync-on-close, build-flag gated (BUILD_REPLAY_SINK_JSONL),
double-close idempotent, FDR mirror on open/close. The drifted
AZ-390 stub in interface.py is removed; the canonical Protocol now
lives in replay_sink.py per module-layout.md and is re-exported via
__init__.py. AZ-390 conformance test widened.

AZ-399 lands TlogReplayFcAdapter: full FcAdapter Protocol surface,
build-flag gated (BUILD_TLOG_REPLAY_ADAPTER), pymavlink stream-parse
with bounded pre-scan + fail-fast on missing required messages
(R-DEMO-3), dedicated decode thread feeding the existing AZ-391
SubscriptionBus. Outbound surface raises FcEmitError per Invariant 5;
request_source_set_switch raises SourceSetSwitchNotSupportedError.
Pacing honours Invariant 6 via Clock.sleep_until_ns. time_offset_ms
shifts every emitted received_at per Invariant 8. Non-monotonic
timestamps raise FcOpenError.

Test coverage: 188 c8_fc_adapter tests pass; 1 skipped (AZ-399 AC-1
500 MB tlog RSS bound, deferred to AZ-404 e2e behind RUN_REPLAY_E2E).
Code review: PASS_WITH_WARNINGS — 1 Medium (mapping logic duplicates
AZ-391 live decoder; intentional today, four behavioural deltas
documented), 2 Low.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-14 05:33:20 +03:00

533 lines
16 KiB
Python

"""AZ-390 — FcAdapter + GcsAdapter Protocols + DTOs + factories + composition.
Covers all 10 ACs:
1. Protocol conformance via @runtime_checkable
2. DTOs frozen + slots
3. Enum membership
4. Factory rejects build-flag OFF
5. Factory rejects unknown strategy at config-load
6. Single-writer outbound thread
7. GcsAdapter factory parallel coverage
8. Public API re-exports
9. Error hierarchy catchability
10. INFO log on build
"""
from __future__ import annotations
import dataclasses
import logging
import threading
from collections.abc import Callable
import pytest
from gps_denied_onboard._types.emitted import EmittedExternalPosition
from gps_denied_onboard._types.fc import (
AttitudeSample,
FcKind,
FcTelemetryFrame,
FlightState,
FlightStateSignal,
GpsHealth,
GpsStatus,
ImuTelemetrySample,
OperatorCommand,
PortConfig,
Severity,
Subscription,
TelemetryKind,
)
from gps_denied_onboard._types.state import EstimatorOutput
from gps_denied_onboard.components.c8_fc_adapter import (
FcAdapter,
GcsAdapter,
ReplaySink,
)
from gps_denied_onboard.components.c8_fc_adapter.errors import (
FcAdapterConfigError,
FcAdapterError,
FcEmitError,
FcOpenError,
GcsAdapterConfigError,
GcsAdapterError,
GcsEmitError,
SigningHandshakeError,
SourceSetSwitchError,
SourceSetSwitchNotSupportedError,
)
from gps_denied_onboard.config import Config, ConfigError, FcConfig, GcsConfig
from gps_denied_onboard.runtime_root.fc_factory import (
OutboundThreadAlreadyBoundError,
bind_outbound_emit_thread,
build_fc_adapter,
build_gcs_adapter,
clear_outbound_thread_binding,
clear_strategy_registries,
register_fc_adapter,
register_gcs_adapter,
)
@pytest.fixture(autouse=True)
def _isolate_factory_state() -> None:
# Arrange — every test starts from a clean registry + thread binding.
clear_strategy_registries()
clear_outbound_thread_binding()
yield
clear_strategy_registries()
clear_outbound_thread_binding()
# ----------------------------------------------------------------------
# AC-1: Protocol conformance
class _FcStub:
def open(self, port: PortConfig, signing_key: bytes | None) -> None: ...
def close(self) -> None: ...
def subscribe_telemetry(self, callback: Callable[[FcTelemetryFrame], None]) -> Subscription:
class _Sub:
def cancel(self) -> None: ...
return _Sub()
def emit_external_position(self, output: EstimatorOutput) -> EmittedExternalPosition:
return EmittedExternalPosition(
fc_kind=FcKind.ARDUPILOT_PLANE,
horiz_accuracy_m=1.0,
source_label=output.source_label,
emitted_at=0,
sequence_number=0,
)
def emit_status_text(self, msg: str, severity: Severity) -> None: ...
def request_source_set_switch(self) -> None: ...
def current_flight_state(self) -> FlightStateSignal:
return FlightStateSignal(
state=FlightState.INIT,
last_valid_gps_hint_wgs84=None,
last_valid_gps_age_ms=None,
captured_at=0,
)
class _GcsStub:
def open(self, port: PortConfig) -> None: ...
def close(self) -> None: ...
def emit_summary(self, output: EstimatorOutput) -> None: ...
def subscribe_operator_commands(
self, callback: Callable[[OperatorCommand], None]
) -> Subscription:
class _Sub:
def cancel(self) -> None: ...
return _Sub()
def emit_status_text(self, msg: str, severity: Severity) -> None: ...
def test_ac1_fc_protocol_conformance() -> None:
# Assert
assert isinstance(_FcStub(), FcAdapter)
def test_ac1_gcs_protocol_conformance() -> None:
# Assert
assert isinstance(_GcsStub(), GcsAdapter)
def test_ac1_replay_sink_protocol_conformance() -> None:
# Arrange — AZ-400 widened the Protocol to the contract shape
# (`emit(EstimatorOutput) -> None` + `close() -> None`).
class _Sink:
def emit(self, output: EstimatorOutput) -> None: ...
def close(self) -> None: ...
# Assert
assert isinstance(_Sink(), ReplaySink)
def test_ac1_replay_sink_rejects_partial_surface() -> None:
# Arrange — a `_Sink` missing `close` no longer satisfies the
# widened Protocol; this guards against AZ-390-style stub drift.
class _MissingClose:
def emit(self, output: EstimatorOutput) -> None: ...
class _MissingEmit:
def close(self) -> None: ...
# Assert
assert not isinstance(_MissingClose(), ReplaySink)
assert not isinstance(_MissingEmit(), ReplaySink)
def test_ac1_protocol_rejects_missing_method() -> None:
# Arrange
class _Incomplete:
def open(self, port: PortConfig, signing_key: bytes | None) -> None: ...
def close(self) -> None: ...
# missing the other methods
# Assert
assert not isinstance(_Incomplete(), FcAdapter)
# ----------------------------------------------------------------------
# AC-2: DTOs frozen + slots
@pytest.mark.parametrize(
"dto, init_kwargs",
[
(PortConfig, {"device": "/dev/ttyTHS1", "baud": 921600, "fc_kind": FcKind.ARDUPILOT_PLANE}),
(
ImuTelemetrySample,
{"ts_ns": 0, "accel_xyz": (0.0, 0.0, 0.0), "gyro_xyz": (0.0, 0.0, 0.0)},
),
(AttitudeSample, {"ts_ns": 0, "roll_rad": 0.0, "pitch_rad": 0.0, "yaw_rad": 0.0}),
(GpsHealth, {"status": GpsStatus.STABLE, "fix_age_ms": 0, "captured_at": 0}),
(
FlightStateSignal,
{
"state": FlightState.INIT,
"last_valid_gps_hint_wgs84": None,
"last_valid_gps_age_ms": None,
"captured_at": 0,
},
),
(OperatorCommand, {"command": "test", "payload": {}, "received_at": 0}),
(
EmittedExternalPosition,
{
"fc_kind": FcKind.ARDUPILOT_PLANE,
"horiz_accuracy_m": 1.0,
"source_label": "visual_propagated",
"emitted_at": 0,
"sequence_number": 0,
},
),
],
)
def test_ac2_dto_frozen_and_slotted(dto: type, init_kwargs: dict) -> None:
# Arrange
instance = dto(**init_kwargs)
# Assert — frozen
with pytest.raises(dataclasses.FrozenInstanceError):
any_field = next(iter(init_kwargs))
setattr(instance, any_field, init_kwargs[any_field])
# Assert — slots
assert hasattr(dto, "__slots__")
assert len(dto.__slots__) > 0
def test_ac2_fc_telemetry_frame_dto_frozen() -> None:
# Arrange
frame = FcTelemetryFrame(
kind=TelemetryKind.IMU_SAMPLE,
payload=ImuTelemetrySample(ts_ns=0, accel_xyz=(0.0, 0.0, 0.0), gyro_xyz=(0.0, 0.0, 0.0)),
received_at=0,
signed=False,
)
# Assert
with pytest.raises(dataclasses.FrozenInstanceError):
frame.signed = True # type: ignore[misc]
assert hasattr(FcTelemetryFrame, "__slots__")
# ----------------------------------------------------------------------
# AC-3: Enum membership
def test_ac3_fc_kind_has_two_members() -> None:
# Assert
assert {m.name for m in FcKind} == {"ARDUPILOT_PLANE", "INAV", "GCS_QGC"}
def test_ac3_flight_state_has_five_members() -> None:
# Assert
assert {m.name for m in FlightState} == {"INIT", "ARMED", "IN_FLIGHT", "ON_GROUND", "FAILED"}
def test_ac3_gps_status_has_five_members() -> None:
# Assert
assert {m.name for m in GpsStatus} == {
"NO_FIX",
"DEGRADED",
"STABLE",
"STABLE_NON_SPOOFED",
"SPOOFED",
}
def test_ac3_severity_values_mirror_mavlink() -> None:
# Assert
assert Severity.INFO.value == 6
assert Severity.WARNING.value == 4
assert Severity.ERROR.value == 3
def test_ac3_telemetry_kind_has_four_members() -> None:
# Assert
assert {m.name for m in TelemetryKind} == {
"IMU_SAMPLE",
"ATTITUDE",
"GPS_HEALTH",
"MAV_STATE",
}
# ----------------------------------------------------------------------
# AC-4: Factory rejects build-flag OFF
def test_ac4_fc_factory_rejects_build_flag_off(monkeypatch: pytest.MonkeyPatch) -> None:
# Arrange
monkeypatch.setenv("BUILD_FC_ARDUPILOT_PLANE", "OFF")
register_fc_adapter("ardupilot_plane", lambda **_: _FcStub())
config = Config(fc=FcConfig(adapter="ardupilot_plane"))
# Act + Assert
with pytest.raises(FcAdapterConfigError, match=r"BUILD_FC_ARDUPILOT_PLANE is OFF"):
build_fc_adapter(config)
def test_ac4_fc_factory_passes_when_flag_on(monkeypatch: pytest.MonkeyPatch) -> None:
# Arrange
monkeypatch.setenv("BUILD_FC_ARDUPILOT_PLANE", "ON")
register_fc_adapter("ardupilot_plane", lambda **_: _FcStub())
config = Config(fc=FcConfig(adapter="ardupilot_plane"))
# Act
adapter = build_fc_adapter(config)
# Assert
assert isinstance(adapter, FcAdapter)
# ----------------------------------------------------------------------
# AC-5: Factory rejects unknown strategy at config-load
def test_ac5_unknown_fc_strategy_rejected_at_config_load() -> None:
# Act + Assert — happens at construction time, not at build time
with pytest.raises(ConfigError, match=r"not in \['ardupilot_plane', 'inav'\]"):
FcConfig(adapter="garbage_fc")
def test_ac5_unknown_gcs_strategy_rejected_at_config_load() -> None:
# Act + Assert
with pytest.raises(ConfigError, match=r"not in \['qgc_mavlink'\]"):
GcsConfig(adapter="garbage_gcs")
def test_ac5_inav_signing_key_combination_rejected() -> None:
# Act + Assert — iNav with signing key is RESTRICT-COMM-2 violation
with pytest.raises(ConfigError, match=r"RESTRICT-COMM-2"):
FcConfig(adapter="inav", signing_key_source="ephemeral_per_flight")
def test_ac5_unregistered_strategy_rejected_at_build_with_clear_message() -> None:
# Arrange — strategy is in the known set but the binary didn't register a factory
config = Config(fc=FcConfig(adapter="inav", signing_key_source="none"))
# Act + Assert
with pytest.raises(FcAdapterConfigError, match=r"not registered"):
build_fc_adapter(config)
# ----------------------------------------------------------------------
# AC-6: Single-writer outbound thread
def test_ac6_first_bind_returns_thread_ident() -> None:
# Act
bound = bind_outbound_emit_thread()
# Assert
assert bound == threading.get_ident()
def test_ac6_second_bind_from_different_thread_rejected() -> None:
# Arrange
bind_outbound_emit_thread(thread_ident=1)
errors: list[BaseException] = []
def attempt_rebind() -> None:
try:
bind_outbound_emit_thread(thread_ident=2)
except OutboundThreadAlreadyBoundError as exc:
errors.append(exc)
# Act
t = threading.Thread(target=attempt_rebind)
t.start()
t.join()
# Assert
assert len(errors) == 1
assert isinstance(errors[0], RuntimeError)
def test_ac6_rebind_same_thread_idempotent() -> None:
# Arrange
first = bind_outbound_emit_thread(thread_ident=42)
# Act
second = bind_outbound_emit_thread(thread_ident=42)
# Assert — re-binding the SAME thread is idempotent (composition root may run twice in tests)
assert first == second == 42
# ----------------------------------------------------------------------
# AC-7: GcsAdapter factory parallel coverage
def test_ac7_gcs_factory_resolves_known_strategy(monkeypatch: pytest.MonkeyPatch) -> None:
# Arrange
monkeypatch.setenv("BUILD_GCS_QGC_MAVLINK", "ON")
register_gcs_adapter("qgc_mavlink", lambda **_: _GcsStub())
config = Config(gcs=GcsConfig(adapter="qgc_mavlink"))
# Act
adapter = build_gcs_adapter(config)
# Assert
assert isinstance(adapter, GcsAdapter)
def test_ac7_gcs_factory_rejects_flag_off(monkeypatch: pytest.MonkeyPatch) -> None:
# Arrange
monkeypatch.setenv("BUILD_GCS_QGC_MAVLINK", "OFF")
register_gcs_adapter("qgc_mavlink", lambda **_: _GcsStub())
config = Config(gcs=GcsConfig(adapter="qgc_mavlink"))
# Act + Assert
with pytest.raises(GcsAdapterConfigError, match=r"BUILD_GCS_QGC_MAVLINK is OFF"):
build_gcs_adapter(config)
# ----------------------------------------------------------------------
# AC-9: Error hierarchy catchability
def test_ac9_every_fc_error_is_fc_adapter_error() -> None:
# Assert — all FC errors share the base
for err_cls in [
FcOpenError,
FcEmitError,
SigningHandshakeError,
SourceSetSwitchError,
SourceSetSwitchNotSupportedError,
FcAdapterConfigError,
]:
assert issubclass(err_cls, FcAdapterError), err_cls
def test_ac9_source_set_switch_not_supported_is_subclass_of_switch_error() -> None:
# Assert
assert issubclass(SourceSetSwitchNotSupportedError, SourceSetSwitchError)
def test_ac9_gcs_errors_share_base() -> None:
# Assert
for err_cls in [GcsEmitError, GcsAdapterConfigError]:
assert issubclass(err_cls, GcsAdapterError), err_cls
def test_ac9_fc_and_gcs_trees_are_disjoint() -> None:
# Assert — catching FcAdapterError must NOT catch GcsAdapterError
assert not issubclass(GcsAdapterError, FcAdapterError)
assert not issubclass(FcAdapterError, GcsAdapterError)
# ----------------------------------------------------------------------
# AC-10: INFO log on build
def test_ac10_info_log_on_fc_build(
monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange
monkeypatch.setenv("BUILD_FC_ARDUPILOT_PLANE", "ON")
register_fc_adapter("ardupilot_plane", lambda **_: _FcStub())
config = Config(fc=FcConfig(adapter="ardupilot_plane", port_device="/dev/ttyS0"))
# Act
with caplog.at_level(logging.INFO, logger="runtime_root.fc_factory"):
build_fc_adapter(config)
# Assert — exactly one strategy_loaded record
matches = [
r for r in caplog.records if getattr(r, "kind", None) == "c8.adapter.strategy_loaded"
]
assert len(matches) == 1
assert matches[0].kv["strategy"] == "ardupilot_plane"
assert matches[0].kv["port_device"] == "/dev/ttyS0"
def test_ac10_info_log_on_gcs_build(
monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange
monkeypatch.setenv("BUILD_GCS_QGC_MAVLINK", "ON")
register_gcs_adapter("qgc_mavlink", lambda **_: _GcsStub())
config = Config(gcs=GcsConfig(adapter="qgc_mavlink", port_device="/dev/ttyS1"))
# Act
with caplog.at_level(logging.INFO, logger="runtime_root.fc_factory"):
build_gcs_adapter(config)
# Assert
matches = [r for r in caplog.records if getattr(r, "kind", None) == "c8.gcs.strategy_loaded"]
assert len(matches) == 1
# ----------------------------------------------------------------------
# NFR: build perf (loose budget — sanity check, not microbench)
def test_nfr_perf_fc_build_under_50ms(monkeypatch: pytest.MonkeyPatch) -> None:
# Arrange
import time
monkeypatch.setenv("BUILD_FC_ARDUPILOT_PLANE", "ON")
register_fc_adapter("ardupilot_plane", lambda **_: _FcStub())
config = Config(fc=FcConfig(adapter="ardupilot_plane"))
# Act
start = time.monotonic()
build_fc_adapter(config)
elapsed_s = time.monotonic() - start
# Assert
assert elapsed_s < 0.05, f"build took {elapsed_s * 1000:.2f}ms (budget 50ms)"
# ----------------------------------------------------------------------
# Coverage of the FcConfig.signing_key_source validator
def test_signing_key_source_unknown_value_rejected() -> None:
# Act + Assert
with pytest.raises(ConfigError, match=r"signing_key_source"):
FcConfig(adapter="ardupilot_plane", signing_key_source="garbage")
def test_gcs_summary_rate_out_of_range_rejected() -> None:
# AZ-397 widened the valid range to [0.5, 5.0] (AC-10); the boundary
# cases below now fall OUTSIDE the new range.
# Act + Assert — too high
with pytest.raises(ConfigError, match=r"summary_rate_hz"):
GcsConfig(summary_rate_hz=5.1)
# Too low
with pytest.raises(ConfigError, match=r"summary_rate_hz"):
GcsConfig(summary_rate_hz=0.4)