mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 16:11:13 +00:00
007aa36fbf
Option A (minimum-deprecation, 2 SP) per user complexity-budget decision. Auto-sync stays importable as a raising stub for one cycle so external callers see a clean ReplayInputAdapterError instead of an ImportError. Full physical removal is filed as AZ-908 (cycle-5+ backlog). Production: - auto_sync.py: 700+ LOC -> 56-line no-op stub raising "auto-sync removed; supply --imu CSV instead" - tlog_video_adapter.py: 700+ LOC -> 105-line deprecated stub; ReplayInputAdapter.open() raises immediately, close() is a no-op - _replay_branch.py: dropped legacy auto-sync branch + _build_auto_sync_config; _validate_replay_paths now requires imu_csv_path; replay_input_adapter_factory parameter removed - cli/replay.py: --time-offset-ms / --skip-auto-sync / --auto-trim emit DeprecationWarning + stderr line; values ignored - tlog_replay_adapter.py + tlog_ground_truth.py docstrings: AUDIT-ONLY Tests: - DELETED test_az405_auto_sync, test_az405_replay_input_adapter, test_az698_window_alignment (covered code no longer runs) - ADDED test_az895_auto_sync_deprecated_stub (5 parametrised, pins AC-1) - test_az402_replay_cli: deprecation warnings + ignored-value asserts - test_az401_compose_root_replay: new imu_csv_path-required gate; deleted the calibration-loading test that relied on the removed replay_input_adapter_factory injection point - test_derkachi_real_tlog: xfail reason refreshed to AZ-848 + AZ-883 (AC-4 "AZ-848-scoped reason") Docs: - module-layout.md: replay_input file list flags deprecated modules, adds csv_ground_truth.py - _dependencies_table.md: +AZ-908 row, preamble + totals updated (179 -> 180 tasks, 567 -> 570 SP) - AZ-908 backlog spec added; AZ-895 spec moved todo -> done - batch_03_cycle4_report.md written Touched-module tests green (111 passed, 1 skipped). Full unit suite green: 2287 passed, 85 skipped, 1 deselected (pre-existing flaky perf test, unrelated). Co-authored-by: Cursor <cursoragent@cursor.com>
771 lines
26 KiB
Python
771 lines
26 KiB
Python
"""AZ-401 — `compose_root(config)` replay-mode branch unit tests.
|
|
|
|
Verifies the contract at ``_docs/02_document/contracts/replay/replay_protocol.md``
|
|
v2.0.0 §Composition Root + ADR-011 (replay-as-configuration). Covers
|
|
AC-1 through AC-10 of the AZ-401 task spec.
|
|
|
|
AC-9 ("``NoopMavlinkTransport.bytes_written() > 0`` after the C8 outbound
|
|
encoders run") is recorded here as a known BLOCKED case: the existing
|
|
:class:`TlogReplayFcAdapter` (AZ-399) raises on every ``emit_external_position``
|
|
call rather than routing the encoder bytes through a transport seam, so
|
|
the encoders never run in replay mode. Closing this gap requires the AP
|
|
/ iNav / QGC encoder retrofits that AZ-400 originally scoped but did
|
|
not deliver. See the batch 61 report for the deferral rationale.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ast
|
|
import json
|
|
from collections.abc import Iterator
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from unittest import mock
|
|
from uuid import UUID, uuid4
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from gps_denied_onboard._types.geo import LatLonAlt
|
|
from gps_denied_onboard._types.state import EstimatorOutput, PoseSourceLabel, Quat
|
|
from gps_denied_onboard.clock.tlog_derived import TlogDerivedClock
|
|
from gps_denied_onboard.clock.wall_clock import WallClock
|
|
from gps_denied_onboard.components.c8_fc_adapter.noop_mavlink_transport import (
|
|
NoopMavlinkTransport,
|
|
)
|
|
from gps_denied_onboard.components.c8_fc_adapter.replay_sink import (
|
|
JsonlReplaySink,
|
|
)
|
|
from gps_denied_onboard.components.c8_fc_adapter.tlog_replay_adapter import (
|
|
TlogReplayFcAdapter,
|
|
)
|
|
from gps_denied_onboard.config import (
|
|
Config,
|
|
ConfigError,
|
|
ReplayAutoSyncConfig,
|
|
ReplayConfig,
|
|
RuntimeConfig,
|
|
)
|
|
from gps_denied_onboard.frame_source.video_file import VideoFileFrameSource
|
|
from gps_denied_onboard.replay_input.interface import ReplayInputBundle
|
|
from gps_denied_onboard.runtime_root import (
|
|
CompositionError,
|
|
RuntimeRoot,
|
|
clear_strategy_registry,
|
|
compose_root,
|
|
)
|
|
from gps_denied_onboard.runtime_root._replay_branch import (
|
|
REPLAY_BUILD_FLAGS,
|
|
REPLAY_COMPONENT_KEYS,
|
|
build_replay_components,
|
|
)
|
|
|
|
|
|
_REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Shared fixtures
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _isolated_registry() -> Iterator[None]:
|
|
clear_strategy_registry()
|
|
yield
|
|
clear_strategy_registry()
|
|
|
|
|
|
@pytest.fixture
|
|
def _airborne_replay_env(
|
|
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
|
|
) -> Path:
|
|
"""Set the env vars + replay BUILD_* flags compose_root needs.
|
|
|
|
Returns the path of a synthetic camera calibration JSON the
|
|
``compose_root`` replay branch will load.
|
|
"""
|
|
calib_path = tmp_path / "calib.json"
|
|
calib_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"camera_id": "test-cam",
|
|
"intrinsics_3x3": np.eye(3).tolist(),
|
|
"distortion": [0.0, 0.0, 0.0, 0.0],
|
|
"body_to_camera_se3": np.eye(4).tolist(),
|
|
"acquisition_method": "operator",
|
|
"metadata": {},
|
|
}
|
|
)
|
|
)
|
|
for name, value in (
|
|
("GPS_DENIED_FC_PROFILE", "ardupilot_plane"),
|
|
("GPS_DENIED_TIER", "1"),
|
|
("DB_URL", "postgresql+psycopg://gps_denied:dev@db:5432/gps_denied"),
|
|
("CAMERA_CALIBRATION_PATH", str(calib_path)),
|
|
("LOG_LEVEL", "INFO"),
|
|
("LOG_SINK", "console"),
|
|
("INFERENCE_BACKEND", "pytorch_fp16"),
|
|
("FDR_PATH", "/var/lib/gps-denied/fdr"),
|
|
("TILE_CACHE_PATH", "/var/lib/gps-denied/tiles"),
|
|
):
|
|
monkeypatch.setenv(name, value)
|
|
for flag in REPLAY_BUILD_FLAGS:
|
|
monkeypatch.setenv(flag, "ON")
|
|
return calib_path
|
|
|
|
|
|
@pytest.fixture
|
|
def _airborne_live_env(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
for name, value in (
|
|
("GPS_DENIED_FC_PROFILE", "ardupilot_plane"),
|
|
("GPS_DENIED_TIER", "1"),
|
|
("DB_URL", "postgresql+psycopg://gps_denied:dev@db:5432/gps_denied"),
|
|
("CAMERA_CALIBRATION_PATH", "/etc/gps-denied/calib.yml"),
|
|
("LOG_LEVEL", "INFO"),
|
|
("LOG_SINK", "console"),
|
|
("INFERENCE_BACKEND", "pytorch_fp16"),
|
|
("FDR_PATH", "/var/lib/gps-denied/fdr"),
|
|
("TILE_CACHE_PATH", "/var/lib/gps-denied/tiles"),
|
|
("MAVLINK_SIGNING_KEY", "ZZZZZZZZ"),
|
|
):
|
|
monkeypatch.setenv(name, value)
|
|
|
|
|
|
def _make_replay_config(
|
|
*,
|
|
pace: str = "asap",
|
|
time_offset_ms: int | None = 0,
|
|
target_fc_dialect: str = "ardupilot_plane",
|
|
output_path: str = "/tmp/replay.jsonl",
|
|
calib_path: Path | None = None,
|
|
) -> Config:
|
|
runtime = (
|
|
RuntimeConfig()
|
|
if calib_path is None
|
|
else RuntimeConfig(camera_calibration_path=str(calib_path))
|
|
)
|
|
# AZ-895: imu_csv_path is required by _validate_replay_paths; the
|
|
# auto-sync surface that previously accepted (video, tlog) alone
|
|
# was deprecated. tlog_path stays set so tests covering the legacy
|
|
# config field continue to round-trip it.
|
|
replay = ReplayConfig(
|
|
video_path="/dev/null/fake.mp4",
|
|
tlog_path="/dev/null/fake.tlog",
|
|
imu_csv_path="/dev/null/fake_imu.csv",
|
|
output_path=output_path,
|
|
pace=pace,
|
|
time_offset_ms=time_offset_ms,
|
|
target_fc_dialect=target_fc_dialect,
|
|
auto_sync=ReplayAutoSyncConfig(),
|
|
)
|
|
return Config(runtime=runtime, replay=replay, mode="replay")
|
|
|
|
|
|
def _make_replay_bundle(
|
|
*,
|
|
clock_kind: str = "tlog",
|
|
) -> ReplayInputBundle:
|
|
"""Build a :class:`ReplayInputBundle` with mocked strategies.
|
|
|
|
The strategies are real instances of the right classes (so AC-3
|
|
``isinstance`` checks pass) but with their internal init guards
|
|
bypassed via ``__new__`` because the production constructors open
|
|
OpenCV / pymavlink resources we don't want in the unit suite.
|
|
"""
|
|
fs = VideoFileFrameSource.__new__(VideoFileFrameSource)
|
|
fc = TlogReplayFcAdapter.__new__(TlogReplayFcAdapter)
|
|
if clock_kind == "tlog":
|
|
clock = TlogDerivedClock(source=iter([1_000_000_000, 2_000_000_000]))
|
|
else:
|
|
clock = WallClock()
|
|
return ReplayInputBundle(
|
|
frame_source=fs,
|
|
fc_adapter=fc,
|
|
clock=clock,
|
|
resolved_time_offset_ms=0,
|
|
auto_sync_result=None,
|
|
)
|
|
|
|
|
|
def _fake_replay_components_factory(
|
|
*,
|
|
bundle: ReplayInputBundle,
|
|
sink: Any | None = None,
|
|
transport: Any | None = None,
|
|
) -> Any:
|
|
"""Return a callable suitable for ``replay_components_factory``."""
|
|
|
|
def factory(_config: Config) -> tuple[dict[str, Any], tuple[str, ...]]:
|
|
components = {
|
|
"frame_source": bundle.frame_source,
|
|
"fc_adapter": bundle.fc_adapter,
|
|
"clock": bundle.clock,
|
|
"mavlink_transport": transport if transport is not None else NoopMavlinkTransport(),
|
|
"replay_sink": sink if sink is not None else mock.MagicMock(spec=JsonlReplaySink),
|
|
}
|
|
return components, REPLAY_COMPONENT_KEYS
|
|
|
|
return factory
|
|
|
|
|
|
def _make_estimator_output(seq: int = 0) -> EstimatorOutput:
|
|
return EstimatorOutput(
|
|
frame_id=uuid4(),
|
|
position_wgs84=LatLonAlt(lat_deg=49.991, lon_deg=36.221, alt_m=153.4 + seq),
|
|
orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0),
|
|
velocity_world_mps=(1.5, -0.25, 0.0),
|
|
covariance_6x6=np.eye(6, dtype=np.float64) * 0.5,
|
|
source_label=PoseSourceLabel.SATELLITE_ANCHORED,
|
|
last_satellite_anchor_age_ms=250,
|
|
smoothed=False,
|
|
emitted_at=1_700_000_000_000_000_000 + seq,
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-1: Single composition root — `compose_replay` no longer exported
|
|
|
|
|
|
def test_ac1_compose_replay_no_longer_exported() -> None:
|
|
# Act / Assert
|
|
with pytest.raises(ImportError):
|
|
from gps_denied_onboard.runtime_root import compose_replay # noqa: F401
|
|
|
|
# The two surviving entrypoints stay importable.
|
|
from gps_denied_onboard.runtime_root import ( # noqa: F401
|
|
compose_operator,
|
|
compose_root,
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-2: Live mode unchanged
|
|
|
|
|
|
def test_ac2_live_default_mode_returns_runtime_root_with_no_replay_keys(
|
|
_airborne_live_env: None,
|
|
) -> None:
|
|
# Arrange — empty config in default (live) mode
|
|
config = Config()
|
|
|
|
# Act
|
|
runtime = compose_root(config)
|
|
|
|
# Assert
|
|
assert isinstance(runtime, RuntimeRoot)
|
|
assert runtime.binary == "airborne"
|
|
# No replay-only keys leak into live mode
|
|
for key in REPLAY_COMPONENT_KEYS:
|
|
assert key not in runtime.components, (
|
|
f"live mode unexpectedly contains replay key {key!r}"
|
|
)
|
|
|
|
|
|
def test_ac2_live_explicit_mode_unchanged(_airborne_live_env: None) -> None:
|
|
# Arrange
|
|
config = Config(mode="live")
|
|
|
|
# Act
|
|
runtime = compose_root(config)
|
|
|
|
# Assert
|
|
assert runtime.components == {}
|
|
assert runtime.construction_order == ()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-3: Replay mode wires replay strategies
|
|
|
|
|
|
def test_ac3_replay_mode_wires_five_replay_strategies(
|
|
_airborne_replay_env: Path,
|
|
) -> None:
|
|
# Arrange
|
|
bundle = _make_replay_bundle(clock_kind="tlog")
|
|
config = _make_replay_config(calib_path=_airborne_replay_env)
|
|
factory = _fake_replay_components_factory(bundle=bundle)
|
|
|
|
# Act
|
|
runtime = compose_root(config, replay_components_factory=factory)
|
|
|
|
# Assert — every replay strategy slot is populated and typed
|
|
assert isinstance(runtime.components["frame_source"], VideoFileFrameSource)
|
|
assert isinstance(runtime.components["fc_adapter"], TlogReplayFcAdapter)
|
|
assert isinstance(runtime.components["mavlink_transport"], NoopMavlinkTransport)
|
|
assert isinstance(runtime.components["clock"], TlogDerivedClock)
|
|
# JsonlReplaySink is a MagicMock(spec=...) here so isinstance gates correctly:
|
|
assert "replay_sink" in runtime.components
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-4: Replay-mode build-flag check
|
|
|
|
|
|
@pytest.mark.parametrize("flag", REPLAY_BUILD_FLAGS)
|
|
def test_ac4_replay_rejects_each_build_flag_off(
|
|
_airborne_replay_env: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
flag: str,
|
|
) -> None:
|
|
# Arrange
|
|
monkeypatch.setenv(flag, "OFF")
|
|
config = _make_replay_config(calib_path=_airborne_replay_env)
|
|
|
|
# Act / Assert — go through the real branch (no factory) so the
|
|
# flag gate runs before the strategy constructors do.
|
|
with pytest.raises(CompositionError, match=f"{flag} is OFF"):
|
|
compose_root(config)
|
|
|
|
|
|
def test_ac4_live_with_replay_flag_off_succeeds(
|
|
_airborne_live_env: None,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
# Arrange
|
|
monkeypatch.setenv("BUILD_VIDEO_FILE_FRAME_SOURCE", "OFF")
|
|
config = Config(mode="live")
|
|
|
|
# Act
|
|
runtime = compose_root(config)
|
|
|
|
# Assert
|
|
assert isinstance(runtime, RuntimeRoot)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-5: Clock injection (single instance, pace-aware)
|
|
|
|
|
|
def test_ac5_replay_pace_asap_uses_tlog_derived_clock(
|
|
_airborne_replay_env: Path,
|
|
) -> None:
|
|
# Arrange
|
|
bundle = _make_replay_bundle(clock_kind="tlog")
|
|
config = _make_replay_config(pace="asap", calib_path=_airborne_replay_env)
|
|
factory = _fake_replay_components_factory(bundle=bundle)
|
|
|
|
# Act
|
|
runtime = compose_root(config, replay_components_factory=factory)
|
|
|
|
# Assert
|
|
assert isinstance(runtime.components["clock"], TlogDerivedClock)
|
|
|
|
|
|
def test_ac5_replay_pace_realtime_uses_wall_clock(
|
|
_airborne_replay_env: Path,
|
|
) -> None:
|
|
# Arrange
|
|
bundle = _make_replay_bundle(clock_kind="wall")
|
|
config = _make_replay_config(pace="realtime", calib_path=_airborne_replay_env)
|
|
factory = _fake_replay_components_factory(bundle=bundle)
|
|
|
|
# Act
|
|
runtime = compose_root(config, replay_components_factory=factory)
|
|
|
|
# Assert
|
|
assert isinstance(runtime.components["clock"], WallClock)
|
|
|
|
|
|
def test_ac5_clock_single_instance_id_equality(
|
|
_airborne_replay_env: Path,
|
|
) -> None:
|
|
"""Invariant 2 — the same Clock instance is wired everywhere."""
|
|
# Arrange
|
|
bundle = _make_replay_bundle(clock_kind="tlog")
|
|
config = _make_replay_config(calib_path=_airborne_replay_env)
|
|
factory = _fake_replay_components_factory(bundle=bundle)
|
|
|
|
# Act
|
|
runtime = compose_root(config, replay_components_factory=factory)
|
|
|
|
# Assert — the Clock instance the bundle returned is exactly the
|
|
# one wired into the runtime.
|
|
assert runtime.components["clock"] is bundle.clock
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-6: JSONL sink emits per tick
|
|
|
|
|
|
def test_ac6_jsonl_sink_emits_per_tick_when_runtime_drives_outputs(
|
|
_airborne_replay_env: Path,
|
|
) -> None:
|
|
# Arrange — a real (in-tmp) JsonlReplaySink so this exercises the
|
|
# production code path; we drive it directly because the runtime
|
|
# loop itself is owned by the airborne entrypoint, not compose_root.
|
|
fdr_client = mock.MagicMock(name="FdrClient")
|
|
sink_path = _airborne_replay_env.parent / "out.jsonl"
|
|
sink = JsonlReplaySink(output_path=sink_path, fdr_client=fdr_client)
|
|
bundle = _make_replay_bundle()
|
|
config = _make_replay_config(
|
|
output_path=str(sink_path), calib_path=_airborne_replay_env
|
|
)
|
|
factory = _fake_replay_components_factory(bundle=bundle, sink=sink)
|
|
|
|
# Act
|
|
runtime = compose_root(config, replay_components_factory=factory)
|
|
wired_sink = runtime.components["replay_sink"]
|
|
assert wired_sink is sink
|
|
for i in range(10):
|
|
wired_sink.emit(_make_estimator_output(seq=i))
|
|
wired_sink.close()
|
|
|
|
# Assert
|
|
lines = sink_path.read_text().splitlines()
|
|
assert len(lines) == 10
|
|
for line in lines:
|
|
json.loads(line) # each line parses as JSON
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-7: No mode-aware imports in components (replay-aware logic confined)
|
|
|
|
|
|
def test_ac7_no_component_imports_video_file_frame_source() -> None:
|
|
"""The only file allowed to import both Live and VideoFile sources is
|
|
the runtime_root composition root.
|
|
"""
|
|
# Arrange
|
|
components_root = (
|
|
_REPO_ROOT / "src" / "gps_denied_onboard" / "components"
|
|
)
|
|
bad: list[str] = []
|
|
|
|
# Act
|
|
for py in components_root.rglob("*.py"):
|
|
text = py.read_text(encoding="utf-8")
|
|
tree = ast.parse(text)
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.ImportFrom):
|
|
module = node.module or ""
|
|
names = {n.name for n in node.names}
|
|
if (
|
|
"frame_source.video_file" in module
|
|
or "VideoFileFrameSource" in names
|
|
):
|
|
bad.append(str(py))
|
|
break
|
|
|
|
# Assert
|
|
assert bad == [], (
|
|
"Components must not import VideoFileFrameSource directly "
|
|
f"(replay-aware imports must live in runtime_root): {bad}"
|
|
)
|
|
|
|
|
|
def test_ac7_only_runtime_root_imports_replay_strategies() -> None:
|
|
"""The imports of the noop transport / replay sink stay in runtime_root."""
|
|
# Arrange
|
|
src_root = _REPO_ROOT / "src" / "gps_denied_onboard"
|
|
components_root = src_root / "components"
|
|
allowed_dirs = {
|
|
src_root / "runtime_root",
|
|
# The replay strategies themselves live under c8_fc_adapter, so
|
|
# their internal imports inside that component are exempt.
|
|
src_root / "components" / "c8_fc_adapter",
|
|
}
|
|
|
|
# Act / Assert — walk every component file and reject imports of
|
|
# the noop transport from outside the allowed directories.
|
|
for py in components_root.rglob("*.py"):
|
|
if any(allowed in py.parents for allowed in allowed_dirs):
|
|
continue
|
|
text = py.read_text(encoding="utf-8")
|
|
if "noop_mavlink_transport" in text:
|
|
raise AssertionError(
|
|
f"{py} imports noop_mavlink_transport — mode-aware "
|
|
"imports must stay in runtime_root."
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-8: Public APIs only across components
|
|
|
|
|
|
def test_ac8_replay_branch_imports_only_public_apis() -> None:
|
|
"""The replay branch must not reach into component internals."""
|
|
# Arrange
|
|
branch_path = (
|
|
_REPO_ROOT
|
|
/ "src"
|
|
/ "gps_denied_onboard"
|
|
/ "runtime_root"
|
|
/ "_replay_branch.py"
|
|
)
|
|
text = branch_path.read_text(encoding="utf-8")
|
|
tree = ast.parse(text)
|
|
|
|
# Allowed deep imports: into the c8_fc_adapter component (the
|
|
# noop transport + the JSONL sink + the AZ-894 CSV replay adapter)
|
|
# and into the `replay_input` cross-cutting coordinator (Layer-4).
|
|
# All of these are documented in module-layout.md as the replay
|
|
# strategy homes.
|
|
allowed_deep_prefixes = (
|
|
"gps_denied_onboard.components.c8_fc_adapter.noop_mavlink_transport",
|
|
"gps_denied_onboard.components.c8_fc_adapter.replay_sink",
|
|
"gps_denied_onboard.components.c8_fc_adapter.csv_replay_adapter",
|
|
"gps_denied_onboard.replay_input.tlog_video_adapter",
|
|
)
|
|
|
|
# Act
|
|
for node in ast.walk(tree):
|
|
if not isinstance(node, ast.ImportFrom):
|
|
continue
|
|
module = node.module or ""
|
|
if not module.startswith("gps_denied_onboard.components"):
|
|
continue
|
|
# Public API form: `gps_denied_onboard.components.<slug>` (no further dots)
|
|
# OR an explicitly allowed deep submodule.
|
|
is_public = module.count(".") == 2
|
|
is_allowed_deep = any(
|
|
module.startswith(prefix) for prefix in allowed_deep_prefixes
|
|
)
|
|
# Assert
|
|
assert is_public or is_allowed_deep, (
|
|
f"_replay_branch imports {module!r} — must only reach into "
|
|
"component Public APIs or the documented replay strategy modules."
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-9: NoopMavlinkTransport.bytes_written() > 0 (closed by AZ-558)
|
|
|
|
|
|
def test_ac9_noop_transport_bytes_written_after_runtime_drive(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
"""AZ-401 AC-9 / AZ-558 AC-4: replay encoders write through the seam.
|
|
|
|
Drives 10 ``EstimatorOutput`` ticks through a replay-wired
|
|
:class:`TlogReplayFcAdapter` with a :class:`NoopMavlinkTransport`
|
|
injected as its outbound seam. After the AZ-558 retrofit the
|
|
adapter encodes ``GPS_INPUT`` + ``NAMED_VALUE_FLOAT`` per tick
|
|
and writes the packed bytes through the transport — replay
|
|
protocol Invariant 5 (encoders run in both modes; only the
|
|
transport differs).
|
|
"""
|
|
# Arrange
|
|
from pymavlink.dialects.v20 import ardupilotmega as _mavlink
|
|
|
|
monkeypatch.setenv("BUILD_REPLAY_SINK_JSONL", "ON")
|
|
transport = NoopMavlinkTransport()
|
|
outbound_mav = _mavlink.MAVLink(file=None, srcSystem=1, srcComponent=1)
|
|
fc = TlogReplayFcAdapter.__new__(TlogReplayFcAdapter)
|
|
# Initialise only the slots the encoder code path consults so the
|
|
# test stays focused on the wire-routing contract (no tlog file,
|
|
# no BUILD_TLOG_REPLAY_ADAPTER gate, no decode thread).
|
|
fc._mavlink_transport = transport
|
|
fc._outbound_mav = outbound_mav
|
|
fc._sequence_number = 0
|
|
fc._clock = WallClock()
|
|
fc._clock_us_provider = lambda: int(fc._clock.monotonic_ns() // 1000)
|
|
fc._clock_ms_boot_provider = (
|
|
lambda: int(fc._clock.monotonic_ns() // 1_000_000) % 0xFFFFFFFF
|
|
)
|
|
output = EstimatorOutput(
|
|
frame_id=uuid4(),
|
|
position_wgs84=LatLonAlt(lat_deg=50.0, lon_deg=30.0, alt_m=100.0),
|
|
orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0),
|
|
velocity_world_mps=(0.0, 0.0, 0.0),
|
|
covariance_6x6=np.eye(6, dtype=np.float64) * 0.25,
|
|
source_label=PoseSourceLabel.VISUAL_PROPAGATED,
|
|
last_satellite_anchor_age_ms=0,
|
|
smoothed=False,
|
|
emitted_at=0,
|
|
)
|
|
|
|
# Act
|
|
for _ in range(10):
|
|
fc.emit_external_position(output)
|
|
|
|
# Assert
|
|
assert transport.bytes_written() > 0, (
|
|
f"NoopMavlinkTransport.bytes_written() = {transport.bytes_written()}; "
|
|
"expected > 0 after 10 emit_external_position calls"
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AC-10: Operator pre-flight C6 cache reused identically — smoke
|
|
|
|
|
|
def test_ac10_replay_does_not_alter_c6_cache_shape(
|
|
_airborne_replay_env: Path,
|
|
) -> None:
|
|
"""Smoke check that the replay branch does not register a parallel
|
|
C6 strategy under a different slug.
|
|
|
|
A real AC-10 end-to-end test requires a populated C6 + C2 wiring,
|
|
which is out of scope for AZ-401's unit suite. This check at least
|
|
asserts the replay branch never claims the ``c6_tile_cache`` slug.
|
|
"""
|
|
# Arrange
|
|
bundle = _make_replay_bundle()
|
|
config = _make_replay_config(calib_path=_airborne_replay_env)
|
|
factory = _fake_replay_components_factory(bundle=bundle)
|
|
|
|
# Act
|
|
runtime = compose_root(config, replay_components_factory=factory)
|
|
|
|
# Assert
|
|
assert "c6_tile_cache" not in runtime.components
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Real `build_replay_components` path — the production wiring must
|
|
# refuse early on missing replay paths instead of crashing inside the
|
|
# adapter constructor.
|
|
|
|
|
|
def test_replay_branch_rejects_empty_video_path(
|
|
_airborne_replay_env: Path,
|
|
) -> None:
|
|
# Arrange
|
|
runtime_cfg = RuntimeConfig(camera_calibration_path=str(_airborne_replay_env))
|
|
config = Config(
|
|
runtime=runtime_cfg,
|
|
replay=ReplayConfig(
|
|
video_path="",
|
|
tlog_path="/dev/null/fake.tlog",
|
|
output_path="/tmp/out.jsonl",
|
|
pace="asap",
|
|
target_fc_dialect="ardupilot_plane",
|
|
),
|
|
mode="replay",
|
|
)
|
|
|
|
# Act / Assert
|
|
with pytest.raises(CompositionError, match="video_path is empty"):
|
|
build_replay_components(config)
|
|
|
|
|
|
def test_replay_branch_rejects_missing_imu_csv_path(
|
|
_airborne_replay_env: Path,
|
|
) -> None:
|
|
# AZ-895: imu_csv_path is required; the legacy tlog-only branch was
|
|
# removed. The (video, CSV) bundle is the only supported composition.
|
|
|
|
# Arrange
|
|
runtime_cfg = RuntimeConfig(camera_calibration_path=str(_airborne_replay_env))
|
|
config = Config(
|
|
runtime=runtime_cfg,
|
|
replay=ReplayConfig(
|
|
video_path="/dev/null/fake.mp4",
|
|
tlog_path="/dev/null/fake.tlog",
|
|
imu_csv_path="",
|
|
output_path="/tmp/out.jsonl",
|
|
pace="asap",
|
|
target_fc_dialect="ardupilot_plane",
|
|
),
|
|
mode="replay",
|
|
)
|
|
|
|
# Act / Assert
|
|
with pytest.raises(CompositionError, match="imu_csv_path is empty"):
|
|
build_replay_components(config)
|
|
|
|
|
|
def test_replay_branch_rejects_unknown_pace_after_init(
|
|
_airborne_replay_env: Path,
|
|
) -> None:
|
|
"""ReplayConfig validates pace at construction; the branch's defensive
|
|
guard catches an unsanctioned mutation path.
|
|
"""
|
|
# Arrange — bypass __post_init__ to inject an invalid value, then
|
|
# call ``build_replay_components`` to confirm the inner guard fires.
|
|
config = _make_replay_config(calib_path=_airborne_replay_env)
|
|
object.__setattr__(config.replay, "pace", "telegraph") # type: ignore[misc]
|
|
|
|
# Act / Assert
|
|
with pytest.raises(CompositionError, match="(pace|telegraph|asap)"):
|
|
build_replay_components(config)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Smoke
|
|
|
|
|
|
def test_compose_root_replay_with_no_calib_path_raises(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
# Arrange — set every env var EXCEPT camera calibration
|
|
for name, value in (
|
|
("GPS_DENIED_FC_PROFILE", "ardupilot_plane"),
|
|
("GPS_DENIED_TIER", "1"),
|
|
("DB_URL", "postgresql+psycopg://gps_denied:dev@db:5432/gps_denied"),
|
|
("CAMERA_CALIBRATION_PATH", ""),
|
|
("LOG_LEVEL", "INFO"),
|
|
("LOG_SINK", "console"),
|
|
("INFERENCE_BACKEND", "pytorch_fp16"),
|
|
("FDR_PATH", "/var/lib/gps-denied/fdr"),
|
|
("TILE_CACHE_PATH", "/var/lib/gps-denied/tiles"),
|
|
):
|
|
monkeypatch.setenv(name, value)
|
|
for flag in REPLAY_BUILD_FLAGS:
|
|
monkeypatch.setenv(flag, "ON")
|
|
config = _make_replay_config() # calib_path=None
|
|
|
|
# Act / Assert — the env-required check + replay calib check both
|
|
# surface as RequiredFieldMissing or CompositionError; either is
|
|
# acceptable provided the message names the missing field.
|
|
with pytest.raises(
|
|
(CompositionError, Exception),
|
|
match=r"(camera_calibration_path|CAMERA_CALIBRATION_PATH)",
|
|
):
|
|
compose_root(config)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# AZ-611 — ReplayConfig.skip_auto_sync_validation schema gate
|
|
|
|
|
|
def test_az611_skip_auto_sync_without_manual_offset_rejected_at_init() -> None:
|
|
"""``__post_init__`` refuses ``skip_auto_sync_validation=True`` paired
|
|
with ``time_offset_ms=None`` — the bypass is only legal once the
|
|
operator has committed to an explicit manual offset.
|
|
"""
|
|
# Act / Assert
|
|
with pytest.raises(
|
|
ConfigError,
|
|
match=r"skip_auto_sync_validation=True requires.*time_offset_ms",
|
|
):
|
|
ReplayConfig(
|
|
video_path="/dev/null/fake.mp4",
|
|
tlog_path="/dev/null/fake.tlog",
|
|
output_path="/tmp/replay.jsonl",
|
|
pace="asap",
|
|
time_offset_ms=None,
|
|
skip_auto_sync_validation=True,
|
|
target_fc_dialect="ardupilot_plane",
|
|
)
|
|
|
|
|
|
def test_az611_skip_auto_sync_with_manual_offset_accepted() -> None:
|
|
"""The legal combination — ``skip_auto_sync_validation=True`` with an
|
|
explicit ``time_offset_ms`` — constructs cleanly and round-trips
|
|
both flags onto the frozen dataclass.
|
|
"""
|
|
# Act
|
|
cfg = ReplayConfig(
|
|
video_path="/dev/null/fake.mp4",
|
|
tlog_path="/dev/null/fake.tlog",
|
|
output_path="/tmp/replay.jsonl",
|
|
pace="asap",
|
|
time_offset_ms=0,
|
|
skip_auto_sync_validation=True,
|
|
target_fc_dialect="ardupilot_plane",
|
|
)
|
|
|
|
# Assert
|
|
assert cfg.skip_auto_sync_validation is True
|
|
assert cfg.time_offset_ms == 0
|
|
|
|
|
|
def test_az611_skip_auto_sync_defaults_to_false() -> None:
|
|
"""Default-constructed ReplayConfig must not opt out of validation."""
|
|
# Act
|
|
cfg = ReplayConfig()
|
|
|
|
# Assert
|
|
assert cfg.skip_auto_sync_validation is False
|