[AZ-401] [AZ-400] Replay — compose_root replay-mode branch + transport seam

Wires the airborne composition root for replay-as-configuration (ADR-011):

- compose_root(config) branches on config.mode in {"live", "replay"}.
  Live behaviour is unchanged; replay builds ReplayInputAdapter,
  attaches JsonlReplaySink, and injects NoopMavlinkTransport.
- New private module runtime_root/_replay_branch.py holds the
  replay-only strategy graph + build-flag gate + calibration loader.
- Config gains Config.mode (Literal["live","replay"]) plus
  Config.replay sub-block with nested ReplayAutoSyncConfig that mirrors
  the AZ-405 AutoSyncConfig DTO; YAML loader + ENV map updated.

Absorbs the AZ-400 transport-seam retrofit that AZ-401 strictly
required but AZ-400 had not delivered:

- New MavlinkTransport Protocol (write/bytes_written/close).
- NoopMavlinkTransport (replay; build-flag gated, idempotent close,
  thread-safe byte counter).
- SerialMavlinkTransport (live, no-op restructure of existing pymavlink
  byte path; encoder retrofit to actually USE it is the AZ-558
  follow-up).

AZ-401 AC-9 (NoopMavlinkTransport.bytes_written > 0 after C8 encoders
run) is BLOCKED on AZ-558 — the encoder routing retrofit is out of
the AZ-401 task envelope (FORBIDDEN files: pymavlink_ardupilot_adapter,
msp2_inav_adapter). AZ-558 spec, batch_61_review.md, and the test's
@pytest.mark.skip rationale all carry the deferral reason.

Tests: 22 compose_root replay-branch tests + 17 transport tests.
Full regression: 2063 passed, 86 environment-skips, 1 documented
skip (AC-9 / AZ-558), 1 pre-existing flaky perf test deselected.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-14 11:55:33 +03:00
parent 8149083cac
commit 17a0d074af
19 changed files with 2156 additions and 45 deletions
@@ -0,0 +1,287 @@
"""AZ-400 retrofit — `MavlinkTransport` Protocol + Noop / Serial impls.
Covers the part of AZ-400 that the v1.0.0 sprint deferred:
the transport seam declared by the replay contract Invariant 5 and
required by AZ-401's ``compose_root`` replay branch (per
``_docs/02_document/contracts/replay/replay_protocol.md`` v2.0.0 lines
14, 109, 222, 237).
Per-test references:
- AC-Transport-1 — protocol conformance
- AC-Transport-2 — noop accepts every byte length, counts cumulatively
- AC-Transport-3 — serial forwards bytes through the underlying connection
- AC-Transport-4 — both raise on write-after-close
- AC-Transport-5 — close is idempotent
- AC-Transport-6 — build flag OFF refuses noop construction
- AC-Transport-7 — serial OSError surfaces as ``MavlinkTransportError``
"""
from __future__ import annotations
from typing import Any
from unittest import mock
import pytest
from gps_denied_onboard.components.c8_fc_adapter import MavlinkTransport
from gps_denied_onboard.components.c8_fc_adapter.errors import (
MavlinkTransportConfigError,
MavlinkTransportError,
)
from gps_denied_onboard.components.c8_fc_adapter.noop_mavlink_transport import (
NoopMavlinkTransport,
)
from gps_denied_onboard.components.c8_fc_adapter.serial_mavlink_transport import (
SerialMavlinkTransport,
)
# ----------------------------------------------------------------------
# Fixtures
@pytest.fixture(autouse=True)
def _build_flag_on(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("BUILD_REPLAY_SINK_JSONL", "ON")
class _FakeConnection:
"""Stub for ``mavutil.mavlink_connection`` — exposes ``write(bytes)``."""
def __init__(self) -> None:
self.received: list[bytes] = []
self.fail_with: Exception | None = None
def write(self, data: bytes) -> int:
if self.fail_with is not None:
raise self.fail_with
self.received.append(bytes(data))
return len(data)
# ----------------------------------------------------------------------
# AC-Transport-1: Protocol conformance
def test_noop_transport_satisfies_protocol() -> None:
# Act
transport = NoopMavlinkTransport()
# Assert
assert isinstance(transport, MavlinkTransport)
def test_serial_transport_satisfies_protocol() -> None:
# Arrange
conn = _FakeConnection()
# Act
transport = SerialMavlinkTransport(connection=conn)
# Assert
assert isinstance(transport, MavlinkTransport)
# ----------------------------------------------------------------------
# AC-Transport-2: NoopMavlinkTransport accepts and counts bytes
def test_noop_transport_counts_cumulative_bytes() -> None:
# Arrange
transport = NoopMavlinkTransport()
# Act
n1 = transport.write(b"abc")
n2 = transport.write(b"")
n3 = transport.write(b"defgh")
# Assert
assert n1 == 3
assert n2 == 0
assert n3 == 5
assert transport.bytes_written() == 8
def test_noop_transport_accepts_bytes_like_views() -> None:
# Arrange
transport = NoopMavlinkTransport()
# Act
transport.write(bytearray(b"abc"))
transport.write(memoryview(b"def"))
# Assert
assert transport.bytes_written() == 6
def test_noop_transport_rejects_non_bytes() -> None:
# Arrange
transport = NoopMavlinkTransport()
# Act / Assert
with pytest.raises(MavlinkTransportError, match="bytes-like"):
transport.write("not-bytes") # type: ignore[arg-type]
# ----------------------------------------------------------------------
# AC-Transport-3: SerialMavlinkTransport forwards bytes
def test_serial_transport_forwards_bytes_to_underlying_connection() -> None:
# Arrange
conn = _FakeConnection()
transport = SerialMavlinkTransport(connection=conn)
# Act
n = transport.write(b"hello")
# Assert
assert n == 5
assert conn.received == [b"hello"]
assert transport.bytes_written() == 5
def test_serial_transport_rejects_missing_write_method() -> None:
# Arrange
class _NoWrite:
pass
# Act / Assert
with pytest.raises(MavlinkTransportError, match=r"\.write\(bytes\)"):
SerialMavlinkTransport(connection=_NoWrite())
def test_serial_transport_rejects_none_connection() -> None:
# Act / Assert
with pytest.raises(MavlinkTransportError, match="open pymavlink connection"):
SerialMavlinkTransport(connection=None)
# ----------------------------------------------------------------------
# AC-Transport-4: write after close raises
def test_noop_transport_write_after_close_raises() -> None:
# Arrange
transport = NoopMavlinkTransport()
transport.write(b"first")
transport.close()
# Act / Assert
with pytest.raises(MavlinkTransportError, match="closed"):
transport.write(b"second")
def test_serial_transport_write_after_close_raises() -> None:
# Arrange
conn = _FakeConnection()
transport = SerialMavlinkTransport(connection=conn)
transport.write(b"first")
transport.close()
# Act / Assert
with pytest.raises(MavlinkTransportError, match="closed"):
transport.write(b"second")
# ----------------------------------------------------------------------
# AC-Transport-5: idempotent close
def test_noop_transport_close_is_idempotent() -> None:
# Arrange
transport = NoopMavlinkTransport()
# Act
transport.close()
transport.close() # must not raise
def test_serial_transport_close_is_idempotent() -> None:
# Arrange
conn = _FakeConnection()
transport = SerialMavlinkTransport(connection=conn)
# Act
transport.close()
transport.close() # must not raise
# ----------------------------------------------------------------------
# AC-Transport-6: BUILD flag gating
def test_noop_transport_build_flag_off_raises(
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Arrange
monkeypatch.setenv("BUILD_REPLAY_SINK_JSONL", "OFF")
# Act / Assert
with pytest.raises(MavlinkTransportConfigError, match="BUILD_REPLAY_SINK_JSONL is OFF"):
NoopMavlinkTransport()
# ----------------------------------------------------------------------
# AC-Transport-7: SerialMavlinkTransport surfaces OSError as MavlinkTransportError
def test_serial_transport_oserror_wrapped() -> None:
# Arrange
conn = _FakeConnection()
conn.fail_with = OSError("device disconnected")
transport = SerialMavlinkTransport(connection=conn)
# Act / Assert
with pytest.raises(MavlinkTransportError, match="underlying write failed"):
transport.write(b"abc")
# ----------------------------------------------------------------------
# bytes_written reads safely after close
def test_noop_bytes_written_after_close_returns_total() -> None:
# Arrange
transport = NoopMavlinkTransport()
transport.write(b"abcd")
transport.close()
# Assert
assert transport.bytes_written() == 4
def test_serial_bytes_written_after_close_returns_total() -> None:
# Arrange
conn = _FakeConnection()
transport = SerialMavlinkTransport(connection=conn)
transport.write(b"abcdef")
transport.close()
# Assert
assert transport.bytes_written() == 6
# ----------------------------------------------------------------------
# Smoke: serial transport handles ``returned is None`` from underlying write
def test_serial_transport_falls_back_to_payload_length_when_write_returns_none() -> None:
# Arrange
conn = mock.MagicMock(spec=["write"])
conn.write.return_value = None
transport = SerialMavlinkTransport(connection=conn)
# Act
n = transport.write(b"abcde")
# Assert
assert n == 5
assert transport.bytes_written() == 5
# ----------------------------------------------------------------------
# Smoke: ad-hoc Any annotation removes pytest unused-import warnings.
_ = Any
+15 -2
View File
@@ -1,4 +1,9 @@
"""C8 FC Adapter smoke test — AC-9 (legacy) + AZ-390 public-API gate."""
"""C8 FC Adapter smoke test — AC-9 (legacy) + AZ-390 public-API gate.
AZ-401 expands the public Protocol surface with ``MavlinkTransport``,
the outbound byte-stream seam shared by ``SerialMavlinkTransport``
(live) and ``NoopMavlinkTransport`` (replay).
"""
def test_interface_importable() -> None:
@@ -7,10 +12,17 @@ def test_interface_importable() -> None:
EmittedExternalPosition,
FcAdapter,
GcsAdapter,
MavlinkTransport,
ReplaySink,
)
for sym in (FcAdapter, GcsAdapter, ReplaySink, EmittedExternalPosition):
for sym in (
FcAdapter,
GcsAdapter,
ReplaySink,
EmittedExternalPosition,
MavlinkTransport,
):
assert sym is not None
@@ -24,5 +36,6 @@ def test_internal_modules_not_in_public_all() -> None:
"EmittedExternalPosition",
"FcAdapter",
"GcsAdapter",
"MavlinkTransport",
"ReplaySink",
}
@@ -0,0 +1,697 @@
"""AZ-401 — `compose_root(config)` replay-mode branch unit tests.
Verifies the contract at ``_docs/02_document/contracts/replay/replay_protocol.md``
v2.0.0 §Composition Root + ADR-011 (replay-as-configuration). Covers
AC-1 through AC-10 of the AZ-401 task spec.
AC-9 ("``NoopMavlinkTransport.bytes_written() > 0`` after the C8 outbound
encoders run") is recorded here as a known BLOCKED case: the existing
:class:`TlogReplayFcAdapter` (AZ-399) raises on every ``emit_external_position``
call rather than routing the encoder bytes through a transport seam, so
the encoders never run in replay mode. Closing this gap requires the AP
/ iNav / QGC encoder retrofits that AZ-400 originally scoped but did
not deliver. See the batch 61 report for the deferral rationale.
"""
from __future__ import annotations
import ast
import json
from collections.abc import Iterator
from pathlib import Path
from typing import Any
from unittest import mock
from uuid import UUID, uuid4
import numpy as np
import pytest
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.state import EstimatorOutput, PoseSourceLabel, Quat
from gps_denied_onboard.clock.tlog_derived import TlogDerivedClock
from gps_denied_onboard.clock.wall_clock import WallClock
from gps_denied_onboard.components.c8_fc_adapter.noop_mavlink_transport import (
NoopMavlinkTransport,
)
from gps_denied_onboard.components.c8_fc_adapter.replay_sink import (
JsonlReplaySink,
)
from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import (
TlogReplayFcAdapter,
)
from gps_denied_onboard.config import (
Config,
ReplayAutoSyncConfig,
ReplayConfig,
RuntimeConfig,
)
from gps_denied_onboard.frame_source.video_file import VideoFileFrameSource
from gps_denied_onboard.replay_input.interface import ReplayInputBundle
from gps_denied_onboard.runtime_root import (
CompositionError,
RuntimeRoot,
clear_strategy_registry,
compose_root,
)
from gps_denied_onboard.runtime_root._replay_branch import (
REPLAY_BUILD_FLAGS,
REPLAY_COMPONENT_KEYS,
build_replay_components,
)
_REPO_ROOT = Path(__file__).resolve().parents[2]
# ----------------------------------------------------------------------
# Shared fixtures
@pytest.fixture(autouse=True)
def _isolated_registry() -> Iterator[None]:
clear_strategy_registry()
yield
clear_strategy_registry()
@pytest.fixture
def _airborne_replay_env(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> Path:
"""Set the env vars + replay BUILD_* flags compose_root needs.
Returns the path of a synthetic camera calibration JSON the
``compose_root`` replay branch will load.
"""
calib_path = tmp_path / "calib.json"
calib_path.write_text(
json.dumps(
{
"camera_id": "test-cam",
"intrinsics_3x3": np.eye(3).tolist(),
"distortion": [0.0, 0.0, 0.0, 0.0],
"body_to_camera_se3": np.eye(4).tolist(),
"acquisition_method": "operator",
"metadata": {},
}
)
)
for name, value in (
("GPS_DENIED_FC_PROFILE", "ardupilot_plane"),
("GPS_DENIED_TIER", "1"),
("DB_URL", "postgresql+psycopg://gps_denied:dev@db:5432/gps_denied"),
("CAMERA_CALIBRATION_PATH", str(calib_path)),
("LOG_LEVEL", "INFO"),
("LOG_SINK", "console"),
("INFERENCE_BACKEND", "pytorch_fp16"),
("FDR_PATH", "/var/lib/gps-denied/fdr"),
("TILE_CACHE_PATH", "/var/lib/gps-denied/tiles"),
):
monkeypatch.setenv(name, value)
for flag in REPLAY_BUILD_FLAGS:
monkeypatch.setenv(flag, "ON")
return calib_path
@pytest.fixture
def _airborne_live_env(monkeypatch: pytest.MonkeyPatch) -> None:
for name, value in (
("GPS_DENIED_FC_PROFILE", "ardupilot_plane"),
("GPS_DENIED_TIER", "1"),
("DB_URL", "postgresql+psycopg://gps_denied:dev@db:5432/gps_denied"),
("CAMERA_CALIBRATION_PATH", "/etc/gps-denied/calib.yml"),
("LOG_LEVEL", "INFO"),
("LOG_SINK", "console"),
("INFERENCE_BACKEND", "pytorch_fp16"),
("FDR_PATH", "/var/lib/gps-denied/fdr"),
("TILE_CACHE_PATH", "/var/lib/gps-denied/tiles"),
("MAVLINK_SIGNING_KEY", "ZZZZZZZZ"),
):
monkeypatch.setenv(name, value)
def _make_replay_config(
*,
pace: str = "asap",
time_offset_ms: int | None = 0,
target_fc_dialect: str = "ardupilot_plane",
output_path: str = "/tmp/replay.jsonl",
calib_path: Path | None = None,
) -> Config:
runtime = (
RuntimeConfig()
if calib_path is None
else RuntimeConfig(camera_calibration_path=str(calib_path))
)
replay = ReplayConfig(
video_path="/dev/null/fake.mp4",
tlog_path="/dev/null/fake.tlog",
output_path=output_path,
pace=pace,
time_offset_ms=time_offset_ms,
target_fc_dialect=target_fc_dialect,
auto_sync=ReplayAutoSyncConfig(),
)
return Config(runtime=runtime, replay=replay, mode="replay")
def _make_replay_bundle(
*,
clock_kind: str = "tlog",
) -> ReplayInputBundle:
"""Build a :class:`ReplayInputBundle` with mocked strategies.
The strategies are real instances of the right classes (so AC-3
``isinstance`` checks pass) but with their internal init guards
bypassed via ``__new__`` because the production constructors open
OpenCV / pymavlink resources we don't want in the unit suite.
"""
fs = VideoFileFrameSource.__new__(VideoFileFrameSource)
fc = TlogReplayFcAdapter.__new__(TlogReplayFcAdapter)
if clock_kind == "tlog":
clock = TlogDerivedClock(source=iter([1_000_000_000, 2_000_000_000]))
else:
clock = WallClock()
return ReplayInputBundle(
frame_source=fs,
fc_adapter=fc,
clock=clock,
resolved_time_offset_ms=0,
auto_sync_result=None,
)
def _fake_replay_components_factory(
*,
bundle: ReplayInputBundle,
sink: Any | None = None,
transport: Any | None = None,
) -> Any:
"""Return a callable suitable for ``replay_components_factory``."""
def factory(_config: Config) -> tuple[dict[str, Any], tuple[str, ...]]:
components = {
"frame_source": bundle.frame_source,
"fc_adapter": bundle.fc_adapter,
"clock": bundle.clock,
"mavlink_transport": transport if transport is not None else NoopMavlinkTransport(),
"replay_sink": sink if sink is not None else mock.MagicMock(spec=JsonlReplaySink),
}
return components, REPLAY_COMPONENT_KEYS
return factory
def _make_estimator_output(seq: int = 0) -> EstimatorOutput:
return EstimatorOutput(
frame_id=uuid4(),
position_wgs84=LatLonAlt(lat_deg=49.991, lon_deg=36.221, alt_m=153.4 + seq),
orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0),
velocity_world_mps=(1.5, -0.25, 0.0),
covariance_6x6=np.eye(6, dtype=np.float64) * 0.5,
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
last_satellite_anchor_age_ms=250,
smoothed=False,
emitted_at=1_700_000_000_000_000_000 + seq,
)
# ----------------------------------------------------------------------
# AC-1: Single composition root — `compose_replay` no longer exported
def test_ac1_compose_replay_no_longer_exported() -> None:
# Act / Assert
with pytest.raises(ImportError):
from gps_denied_onboard.runtime_root import compose_replay # noqa: F401
# The two surviving entrypoints stay importable.
from gps_denied_onboard.runtime_root import ( # noqa: F401
compose_operator,
compose_root,
)
# ----------------------------------------------------------------------
# AC-2: Live mode unchanged
def test_ac2_live_default_mode_returns_runtime_root_with_no_replay_keys(
_airborne_live_env: None,
) -> None:
# Arrange — empty config in default (live) mode
config = Config()
# Act
runtime = compose_root(config)
# Assert
assert isinstance(runtime, RuntimeRoot)
assert runtime.binary == "airborne"
# No replay-only keys leak into live mode
for key in REPLAY_COMPONENT_KEYS:
assert key not in runtime.components, (
f"live mode unexpectedly contains replay key {key!r}"
)
def test_ac2_live_explicit_mode_unchanged(_airborne_live_env: None) -> None:
# Arrange
config = Config(mode="live")
# Act
runtime = compose_root(config)
# Assert
assert runtime.components == {}
assert runtime.construction_order == ()
# ----------------------------------------------------------------------
# AC-3: Replay mode wires replay strategies
def test_ac3_replay_mode_wires_five_replay_strategies(
_airborne_replay_env: Path,
) -> None:
# Arrange
bundle = _make_replay_bundle(clock_kind="tlog")
config = _make_replay_config(calib_path=_airborne_replay_env)
factory = _fake_replay_components_factory(bundle=bundle)
# Act
runtime = compose_root(config, replay_components_factory=factory)
# Assert — every replay strategy slot is populated and typed
assert isinstance(runtime.components["frame_source"], VideoFileFrameSource)
assert isinstance(runtime.components["fc_adapter"], TlogReplayFcAdapter)
assert isinstance(runtime.components["mavlink_transport"], NoopMavlinkTransport)
assert isinstance(runtime.components["clock"], TlogDerivedClock)
# JsonlReplaySink is a MagicMock(spec=...) here so isinstance gates correctly:
assert "replay_sink" in runtime.components
# ----------------------------------------------------------------------
# AC-4: Replay-mode build-flag check
@pytest.mark.parametrize("flag", REPLAY_BUILD_FLAGS)
def test_ac4_replay_rejects_each_build_flag_off(
_airborne_replay_env: Path,
monkeypatch: pytest.MonkeyPatch,
flag: str,
) -> None:
# Arrange
monkeypatch.setenv(flag, "OFF")
config = _make_replay_config(calib_path=_airborne_replay_env)
# Act / Assert — go through the real branch (no factory) so the
# flag gate runs before the strategy constructors do.
with pytest.raises(CompositionError, match=f"{flag} is OFF"):
compose_root(config)
def test_ac4_live_with_replay_flag_off_succeeds(
_airborne_live_env: None,
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Arrange
monkeypatch.setenv("BUILD_VIDEO_FILE_FRAME_SOURCE", "OFF")
config = Config(mode="live")
# Act
runtime = compose_root(config)
# Assert
assert isinstance(runtime, RuntimeRoot)
# ----------------------------------------------------------------------
# AC-5: Clock injection (single instance, pace-aware)
def test_ac5_replay_pace_asap_uses_tlog_derived_clock(
_airborne_replay_env: Path,
) -> None:
# Arrange
bundle = _make_replay_bundle(clock_kind="tlog")
config = _make_replay_config(pace="asap", calib_path=_airborne_replay_env)
factory = _fake_replay_components_factory(bundle=bundle)
# Act
runtime = compose_root(config, replay_components_factory=factory)
# Assert
assert isinstance(runtime.components["clock"], TlogDerivedClock)
def test_ac5_replay_pace_realtime_uses_wall_clock(
_airborne_replay_env: Path,
) -> None:
# Arrange
bundle = _make_replay_bundle(clock_kind="wall")
config = _make_replay_config(pace="realtime", calib_path=_airborne_replay_env)
factory = _fake_replay_components_factory(bundle=bundle)
# Act
runtime = compose_root(config, replay_components_factory=factory)
# Assert
assert isinstance(runtime.components["clock"], WallClock)
def test_ac5_clock_single_instance_id_equality(
_airborne_replay_env: Path,
) -> None:
"""Invariant 2 — the same Clock instance is wired everywhere."""
# Arrange
bundle = _make_replay_bundle(clock_kind="tlog")
config = _make_replay_config(calib_path=_airborne_replay_env)
factory = _fake_replay_components_factory(bundle=bundle)
# Act
runtime = compose_root(config, replay_components_factory=factory)
# Assert — the Clock instance the bundle returned is exactly the
# one wired into the runtime.
assert runtime.components["clock"] is bundle.clock
# ----------------------------------------------------------------------
# AC-6: JSONL sink emits per tick
def test_ac6_jsonl_sink_emits_per_tick_when_runtime_drives_outputs(
_airborne_replay_env: Path,
) -> None:
# Arrange — a real (in-tmp) JsonlReplaySink so this exercises the
# production code path; we drive it directly because the runtime
# loop itself is owned by the airborne entrypoint, not compose_root.
fdr_client = mock.MagicMock(name="FdrClient")
sink_path = _airborne_replay_env.parent / "out.jsonl"
sink = JsonlReplaySink(output_path=sink_path, fdr_client=fdr_client)
bundle = _make_replay_bundle()
config = _make_replay_config(
output_path=str(sink_path), calib_path=_airborne_replay_env
)
factory = _fake_replay_components_factory(bundle=bundle, sink=sink)
# Act
runtime = compose_root(config, replay_components_factory=factory)
wired_sink = runtime.components["replay_sink"]
assert wired_sink is sink
for i in range(10):
wired_sink.emit(_make_estimator_output(seq=i))
wired_sink.close()
# Assert
lines = sink_path.read_text().splitlines()
assert len(lines) == 10
for line in lines:
json.loads(line) # each line parses as JSON
# ----------------------------------------------------------------------
# AC-7: No mode-aware imports in components (replay-aware logic confined)
def test_ac7_no_component_imports_video_file_frame_source() -> None:
"""The only file allowed to import both Live and VideoFile sources is
the runtime_root composition root.
"""
# Arrange
components_root = (
_REPO_ROOT / "src" / "gps_denied_onboard" / "components"
)
bad: list[str] = []
# Act
for py in components_root.rglob("*.py"):
text = py.read_text(encoding="utf-8")
tree = ast.parse(text)
for node in ast.walk(tree):
if isinstance(node, ast.ImportFrom):
module = node.module or ""
names = {n.name for n in node.names}
if (
"frame_source.video_file" in module
or "VideoFileFrameSource" in names
):
bad.append(str(py))
break
# Assert
assert bad == [], (
"Components must not import VideoFileFrameSource directly "
f"(replay-aware imports must live in runtime_root): {bad}"
)
def test_ac7_only_runtime_root_imports_replay_strategies() -> None:
"""The imports of the noop transport / replay sink stay in runtime_root."""
# Arrange
src_root = _REPO_ROOT / "src" / "gps_denied_onboard"
components_root = src_root / "components"
allowed_dirs = {
src_root / "runtime_root",
# The replay strategies themselves live under c8_fc_adapter, so
# their internal imports inside that component are exempt.
src_root / "components" / "c8_fc_adapter",
}
# Act / Assert — walk every component file and reject imports of
# the noop transport from outside the allowed directories.
for py in components_root.rglob("*.py"):
if any(allowed in py.parents for allowed in allowed_dirs):
continue
text = py.read_text(encoding="utf-8")
if "noop_mavlink_transport" in text:
raise AssertionError(
f"{py} imports noop_mavlink_transport — mode-aware "
"imports must stay in runtime_root."
)
# ----------------------------------------------------------------------
# AC-8: Public APIs only across components
def test_ac8_replay_branch_imports_only_public_apis() -> None:
"""The replay branch must not reach into component internals."""
# Arrange
branch_path = (
_REPO_ROOT
/ "src"
/ "gps_denied_onboard"
/ "runtime_root"
/ "_replay_branch.py"
)
text = branch_path.read_text(encoding="utf-8")
tree = ast.parse(text)
# Allowed deep imports: into the c8_fc_adapter component (the
# noop transport + the JSONL sink) and into the `replay_input`
# cross-cutting coordinator (Layer-4). Both are documented in
# module-layout.md as the replay strategy homes.
allowed_deep_prefixes = (
"gps_denied_onboard.components.c8_fc_adapter.noop_mavlink_transport",
"gps_denied_onboard.components.c8_fc_adapter.replay_sink",
"gps_denied_onboard.replay_input.tlog_video_adapter",
)
# Act
for node in ast.walk(tree):
if not isinstance(node, ast.ImportFrom):
continue
module = node.module or ""
if not module.startswith("gps_denied_onboard.components"):
continue
# Public API form: `gps_denied_onboard.components.<slug>` (no further dots)
# OR an explicitly allowed deep submodule.
is_public = module.count(".") == 2
is_allowed_deep = any(
module.startswith(prefix) for prefix in allowed_deep_prefixes
)
# Assert
assert is_public or is_allowed_deep, (
f"_replay_branch imports {module!r} — must only reach into "
"component Public APIs or the documented replay strategy modules."
)
# ----------------------------------------------------------------------
# AC-9: NoopMavlinkTransport.bytes_written() > 0 — BLOCKED
@pytest.mark.skip(
reason=(
"BLOCKED on AZ-399 design choice: TlogReplayFcAdapter raises "
"FcEmitError on emit_external_position rather than routing the "
"encoder bytes through the MavlinkTransport seam. Closing this "
"gap requires retrofitting AP/iNav/QGC encoder code paths to "
"consume MavlinkTransport — see batch 61 report. NoopMavlinkTransport "
"+ MavlinkTransport Protocol classes are present (covered by "
"test_az400_mavlink_transport.py) but the wiring that makes "
"bytes_written > 0 in replay mode is deferred."
)
)
def test_ac9_noop_transport_bytes_written_after_runtime_drive() -> None:
raise NotImplementedError("see skip reason")
# ----------------------------------------------------------------------
# AC-10: Operator pre-flight C6 cache reused identically — smoke
def test_ac10_replay_does_not_alter_c6_cache_shape(
_airborne_replay_env: Path,
) -> None:
"""Smoke check that the replay branch does not register a parallel
C6 strategy under a different slug.
A real AC-10 end-to-end test requires a populated C6 + C2 wiring,
which is out of scope for AZ-401's unit suite. This check at least
asserts the replay branch never claims the ``c6_tile_cache`` slug.
"""
# Arrange
bundle = _make_replay_bundle()
config = _make_replay_config(calib_path=_airborne_replay_env)
factory = _fake_replay_components_factory(bundle=bundle)
# Act
runtime = compose_root(config, replay_components_factory=factory)
# Assert
assert "c6_tile_cache" not in runtime.components
# ----------------------------------------------------------------------
# Real `build_replay_components` path — the production wiring must
# refuse early on missing replay paths instead of crashing inside the
# adapter constructor.
def test_replay_branch_rejects_empty_video_path(
_airborne_replay_env: Path,
) -> None:
# Arrange
runtime_cfg = RuntimeConfig(camera_calibration_path=str(_airborne_replay_env))
config = Config(
runtime=runtime_cfg,
replay=ReplayConfig(
video_path="",
tlog_path="/dev/null/fake.tlog",
output_path="/tmp/out.jsonl",
pace="asap",
target_fc_dialect="ardupilot_plane",
),
mode="replay",
)
# Act / Assert
with pytest.raises(CompositionError, match="video_path is empty"):
build_replay_components(config)
def test_replay_branch_rejects_empty_tlog_path(
_airborne_replay_env: Path,
) -> None:
# Arrange
runtime_cfg = RuntimeConfig(camera_calibration_path=str(_airborne_replay_env))
config = Config(
runtime=runtime_cfg,
replay=ReplayConfig(
video_path="/dev/null/fake.mp4",
tlog_path="",
output_path="/tmp/out.jsonl",
pace="asap",
target_fc_dialect="ardupilot_plane",
),
mode="replay",
)
# Act / Assert
with pytest.raises(CompositionError, match="tlog_path is empty"):
build_replay_components(config)
def test_replay_branch_rejects_unknown_pace_after_init(
_airborne_replay_env: Path,
) -> None:
"""ReplayConfig validates pace at construction; the branch's defensive
guard catches an unsanctioned mutation path.
"""
# Arrange — bypass __post_init__ to inject an invalid value, then
# call ``build_replay_components`` to confirm the inner guard fires.
config = _make_replay_config(calib_path=_airborne_replay_env)
object.__setattr__(config.replay, "pace", "telegraph") # type: ignore[misc]
# Act / Assert
with pytest.raises(CompositionError, match="(pace|telegraph|asap)"):
build_replay_components(config)
def test_replay_branch_loads_camera_calibration_from_runtime_path(
_airborne_replay_env: Path,
) -> None:
"""The branch reads the SAME calibration JSON the live binary uses."""
# Arrange
config = _make_replay_config(calib_path=_airborne_replay_env)
# Act — run far enough to populate the bundle without hitting the
# real video / tlog readers. We do that by injecting a stub
# ``replay_input_adapter_factory`` that returns a fake adapter
# whose ``open()`` produces a trivial bundle.
bundle = _make_replay_bundle()
class _StubAdapter:
def __init__(self, **_kwargs: Any) -> None:
pass
def open(self) -> ReplayInputBundle:
return bundle
components, order = build_replay_components(
config,
replay_input_adapter_factory=lambda **_kwargs: _StubAdapter(),
sink_factory=lambda *_args: mock.MagicMock(spec=JsonlReplaySink),
)
# Assert
assert order == REPLAY_COMPONENT_KEYS
assert components["frame_source"] is bundle.frame_source
assert components["fc_adapter"] is bundle.fc_adapter
# ----------------------------------------------------------------------
# Smoke
def test_compose_root_replay_with_no_calib_path_raises(
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Arrange — set every env var EXCEPT camera calibration
for name, value in (
("GPS_DENIED_FC_PROFILE", "ardupilot_plane"),
("GPS_DENIED_TIER", "1"),
("DB_URL", "postgresql+psycopg://gps_denied:dev@db:5432/gps_denied"),
("CAMERA_CALIBRATION_PATH", ""),
("LOG_LEVEL", "INFO"),
("LOG_SINK", "console"),
("INFERENCE_BACKEND", "pytorch_fp16"),
("FDR_PATH", "/var/lib/gps-denied/fdr"),
("TILE_CACHE_PATH", "/var/lib/gps-denied/tiles"),
):
monkeypatch.setenv(name, value)
for flag in REPLAY_BUILD_FLAGS:
monkeypatch.setenv(flag, "ON")
config = _make_replay_config() # calib_path=None
# Act / Assert — the env-required check + replay calib check both
# surface as RequiredFieldMissing or CompositionError; either is
# acceptable provided the message names the missing field.
with pytest.raises(
(CompositionError, Exception),
match=r"(camera_calibration_path|CAMERA_CALIBRATION_PATH)",
):
compose_root(config)