Files
Oleksandr Bezdieniezhnykh fa3742d582 [AZ-399] [AZ-400] C8 TlogReplayFcAdapter + ReplaySink + JsonlReplaySink
Opens E-DEMO-REPLAY (AZ-265): the two C8 strategies that let the
upcoming compose_replay (AZ-401) and gps-denied-replay CLI (AZ-402)
run the production C1-C5 pipeline against a recorded (.tlog, video)
pair without touching live FC I/O.

AZ-400 lands the contract ReplaySink Protocol (emit + close per
replay_protocol.md v1.0.0) and JsonlReplaySink: orjson-serialised
JSONL, fsync-on-close, build-flag gated (BUILD_REPLAY_SINK_JSONL),
double-close idempotent, FDR mirror on open/close. The drifted
AZ-390 stub in interface.py is removed; the canonical Protocol now
lives in replay_sink.py per module-layout.md and is re-exported via
__init__.py. AZ-390 conformance test widened.

AZ-399 lands TlogReplayFcAdapter: full FcAdapter Protocol surface,
build-flag gated (BUILD_TLOG_REPLAY_ADAPTER), pymavlink stream-parse
with bounded pre-scan + fail-fast on missing required messages
(R-DEMO-3), dedicated decode thread feeding the existing AZ-391
SubscriptionBus. Outbound surface raises FcEmitError per Invariant 5;
request_source_set_switch raises SourceSetSwitchNotSupportedError.
Pacing honours Invariant 6 via Clock.sleep_until_ns. time_offset_ms
shifts every emitted received_at per Invariant 8. Non-monotonic
timestamps raise FcOpenError.

Test coverage: 188 c8_fc_adapter tests pass; 1 skipped (AZ-399 AC-1
500 MB tlog RSS bound, deferred to AZ-404 e2e behind RUN_REPLAY_E2E).
Code review: PASS_WITH_WARNINGS — 1 Medium (mapping logic duplicates
AZ-391 live decoder; intentional today, four behavioural deltas
documented), 2 Low.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-14 05:33:20 +03:00

433 lines
13 KiB
Python

"""AZ-400 — `ReplaySink` Protocol + `JsonlReplaySink` unit tests.
Covers AC-1 through AC-10 of the AZ-400 task spec
(``_docs/02_tasks/todo/AZ-400_replay_jsonl_sink.md``) plus the
contract-aligned schema match against
``EstimatorOutput.__dataclass_fields__``.
"""
from __future__ import annotations
import dataclasses
import json
import os
import time
from pathlib import Path
from typing import Any
from unittest import mock
from uuid import UUID, uuid4
import numpy as np
import pytest
from gps_denied_onboard._types.geo import LatLonAlt
from gps_denied_onboard._types.state import EstimatorOutput, PoseSourceLabel, Quat
from gps_denied_onboard.components.c8_fc_adapter import ReplaySink
from gps_denied_onboard.components.c8_fc_adapter.replay_sink import (
JsonlReplaySink,
ReplaySinkConfigError,
ReplaySinkError,
create,
)
# ----------------------------------------------------------------------
# Fixtures
@pytest.fixture(autouse=True)
def _build_flag_on(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("BUILD_REPLAY_SINK_JSONL", "ON")
@pytest.fixture
def fake_fdr_client() -> mock.MagicMock:
return mock.MagicMock(name="FdrClient")
def _make_output(
*,
frame_id: UUID | None = None,
covariance: np.ndarray | None = None,
source_label: PoseSourceLabel = PoseSourceLabel.SATELLITE_ANCHORED,
smoothed: bool = False,
last_anchor_age_ms: int = 250,
emitted_at: int = 1_700_000_000_000_000_000,
) -> EstimatorOutput:
cov = covariance if covariance is not None else np.eye(6, dtype=np.float64) * 0.5
return EstimatorOutput(
frame_id=frame_id if frame_id is not None else uuid4(),
position_wgs84=LatLonAlt(lat_deg=49.991, lon_deg=36.221, alt_m=153.4),
orientation_world_T_body=Quat(w=1.0, x=0.0, y=0.0, z=0.0),
velocity_world_mps=(1.5, -0.25, 0.0),
covariance_6x6=cov,
source_label=source_label,
last_satellite_anchor_age_ms=last_anchor_age_ms,
smoothed=smoothed,
emitted_at=emitted_at,
)
# ----------------------------------------------------------------------
# AC-1: Protocol conformance
def test_ac1_protocol_conformance(tmp_path: Path, fake_fdr_client: mock.MagicMock) -> None:
# Act
sink = JsonlReplaySink(tmp_path / "out.jsonl", fake_fdr_client)
# Assert
assert isinstance(sink, ReplaySink)
sink.close()
# ----------------------------------------------------------------------
# AC-2: One JSON per emit (100 records → 100 lines)
def test_ac2_one_json_per_emit(tmp_path: Path, fake_fdr_client: mock.MagicMock) -> None:
# Arrange
out_path = tmp_path / "many.jsonl"
sink = JsonlReplaySink(out_path, fake_fdr_client)
# Act
for i in range(100):
sink.emit(_make_output(emitted_at=1_700_000_000_000_000_000 + i))
sink.close()
# Assert
body = out_path.read_text(encoding="utf-8")
lines = body.splitlines()
assert len(lines) == 100
for line in lines:
# Every line is a self-contained JSON object.
decoded = json.loads(line)
assert isinstance(decoded, dict)
# ----------------------------------------------------------------------
# AC-3: Schema match (every dataclass field present)
def test_ac3_schema_matches_dataclass_fields(
tmp_path: Path, fake_fdr_client: mock.MagicMock
) -> None:
# Arrange
out_path = tmp_path / "schema.jsonl"
sink = JsonlReplaySink(out_path, fake_fdr_client)
output = _make_output()
expected_keys = set(dataclasses.fields(EstimatorOutput))
expected_field_names = {field.name for field in expected_keys}
# Act
sink.emit(output)
sink.close()
# Assert
[line] = out_path.read_text(encoding="utf-8").splitlines()
decoded = json.loads(line)
assert set(decoded.keys()) == expected_field_names
# ----------------------------------------------------------------------
# AC-4: numpy → flat list of 36 floats
def test_ac4_numpy_to_flat_list(tmp_path: Path, fake_fdr_client: mock.MagicMock) -> None:
# Arrange
out_path = tmp_path / "cov.jsonl"
sink = JsonlReplaySink(out_path, fake_fdr_client)
# Act
sink.emit(_make_output(covariance=np.eye(6, dtype=np.float64)))
sink.close()
# Assert
[line] = out_path.read_text(encoding="utf-8").splitlines()
decoded = json.loads(line)
cov = decoded["covariance_6x6"]
assert isinstance(cov, list)
assert len(cov) == 36
expected = np.eye(6, dtype=np.float64).flatten().tolist()
assert cov == expected
# ----------------------------------------------------------------------
# AC-5: enum → string name (NOT the integer/value form)
def test_ac5_enum_to_name_string(tmp_path: Path, fake_fdr_client: mock.MagicMock) -> None:
# Arrange
out_path = tmp_path / "label.jsonl"
sink = JsonlReplaySink(out_path, fake_fdr_client)
# Act
sink.emit(_make_output(source_label=PoseSourceLabel.SATELLITE_ANCHORED))
sink.close()
# Assert
[line] = out_path.read_text(encoding="utf-8").splitlines()
decoded = json.loads(line)
assert decoded["source_label"] == "SATELLITE_ANCHORED"
assert decoded["source_label"] != PoseSourceLabel.SATELLITE_ANCHORED.value
# ----------------------------------------------------------------------
# AC-6: missing parent dir raises
def test_ac6_missing_parent_dir_raises(
tmp_path: Path, fake_fdr_client: mock.MagicMock
) -> None:
# Arrange
bad_path = tmp_path / "definitely_does_not_exist_dir" / "out.jsonl"
# Act / Assert
with pytest.raises(ReplaySinkError, match="output parent directory does not exist"):
JsonlReplaySink(bad_path, fake_fdr_client)
# ----------------------------------------------------------------------
# AC-7: close fsyncs (smoke check via fsync mock + size match)
def test_ac7_close_fsyncs(tmp_path: Path, fake_fdr_client: mock.MagicMock) -> None:
# Arrange
out_path = tmp_path / "fsync.jsonl"
sink = JsonlReplaySink(out_path, fake_fdr_client)
for i in range(100):
sink.emit(_make_output(emitted_at=i))
# Act
with mock.patch("os.fsync") as fsync_mock:
sink.close()
# Assert
fsync_mock.assert_called_once()
expected_lines = 100
actual_lines = len(out_path.read_text(encoding="utf-8").splitlines())
assert actual_lines == expected_lines
# ----------------------------------------------------------------------
# AC-8: double close is idempotent (second call no-ops + DEBUG log)
def test_ac8_double_close_idempotent(
tmp_path: Path, fake_fdr_client: mock.MagicMock, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange
out_path = tmp_path / "double.jsonl"
sink = JsonlReplaySink(out_path, fake_fdr_client)
sink.emit(_make_output())
# Act
sink.close()
caplog.clear()
with caplog.at_level("DEBUG", logger="c8_fc_adapter.replay_sink"):
sink.close()
# Assert
debug_kinds = [
record.kind # type: ignore[attr-defined]
for record in caplog.records
if hasattr(record, "kind")
]
assert "replay.sink.double_close" in debug_kinds
# ----------------------------------------------------------------------
# AC-9: lines_written reported on close (INFO log carries the count)
def test_ac9_lines_written_reported_on_close(
tmp_path: Path, fake_fdr_client: mock.MagicMock, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange
out_path = tmp_path / "count.jsonl"
sink = JsonlReplaySink(out_path, fake_fdr_client)
for _ in range(100):
sink.emit(_make_output())
# Act
with caplog.at_level("INFO", logger="c8_fc_adapter.replay_sink"):
sink.close()
# Assert
closed_records = [
record for record in caplog.records if getattr(record, "kind", "") == "replay.sink.closed"
]
assert len(closed_records) == 1
kv = closed_records[0].kv # type: ignore[attr-defined]
assert kv["lines_written"] == 100
# ----------------------------------------------------------------------
# AC-10: build-flag gating
def test_ac10_build_flag_off_raises(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
fake_fdr_client: mock.MagicMock,
) -> None:
# Arrange
monkeypatch.setenv("BUILD_REPLAY_SINK_JSONL", "OFF")
# Act / Assert
with pytest.raises(ReplaySinkConfigError, match="BUILD_REPLAY_SINK_JSONL is OFF"):
JsonlReplaySink(tmp_path / "out.jsonl", fake_fdr_client)
# ----------------------------------------------------------------------
# Schema fidelity — round-trip every documented per-field shape rule
def test_schema_round_trip_all_fields(
tmp_path: Path, fake_fdr_client: mock.MagicMock
) -> None:
# Arrange
out_path = tmp_path / "round_trip.jsonl"
sink = JsonlReplaySink(out_path, fake_fdr_client)
cov = np.arange(36, dtype=np.float64).reshape(6, 6) * 0.001
output = _make_output(
frame_id=UUID("12345678-1234-5678-1234-567812345678"),
covariance=cov,
source_label=PoseSourceLabel.VISUAL_PROPAGATED,
smoothed=False,
last_anchor_age_ms=125,
emitted_at=1_700_000_000_000_000_001,
)
# Act
sink.emit(output)
sink.close()
# Assert
[line] = out_path.read_text(encoding="utf-8").splitlines()
decoded = json.loads(line)
assert decoded["frame_id"] == "12345678-1234-5678-1234-567812345678"
assert decoded["position_wgs84"] == {"lat_deg": 49.991, "lon_deg": 36.221, "alt_m": 153.4}
assert decoded["orientation_world_T_body"] == {
"w": 1.0,
"x": 0.0,
"y": 0.0,
"z": 0.0,
}
assert decoded["velocity_world_mps"] == [1.5, -0.25, 0.0]
assert decoded["covariance_6x6"] == cov.flatten().tolist()
assert decoded["source_label"] == "VISUAL_PROPAGATED"
assert decoded["last_satellite_anchor_age_ms"] == 125
assert decoded["smoothed"] is False
assert decoded["emitted_at"] == 1_700_000_000_000_000_001
# ----------------------------------------------------------------------
# Error paths
def test_emit_after_close_raises(
tmp_path: Path, fake_fdr_client: mock.MagicMock
) -> None:
# Arrange
sink = JsonlReplaySink(tmp_path / "err.jsonl", fake_fdr_client)
sink.close()
# Act / Assert
with pytest.raises(ReplaySinkError, match="emit on closed JsonlReplaySink"):
sink.emit(_make_output())
def test_emit_open_log_emitted(
tmp_path: Path, fake_fdr_client: mock.MagicMock, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange + Act
out_path = tmp_path / "open.jsonl"
with caplog.at_level("INFO", logger="c8_fc_adapter.replay_sink"):
sink = JsonlReplaySink(out_path, fake_fdr_client)
sink.close()
# Assert
open_records = [
record for record in caplog.records if getattr(record, "kind", "") == "replay.sink.opened"
]
assert len(open_records) == 1
kv = open_records[0].kv # type: ignore[attr-defined]
assert kv["output_path"] == str(out_path)
def test_fdr_open_close_events_emitted(
tmp_path: Path, fake_fdr_client: mock.MagicMock
) -> None:
# Arrange
out_path = tmp_path / "fdr.jsonl"
# Act
sink = JsonlReplaySink(out_path, fake_fdr_client)
sink.emit(_make_output())
sink.close()
# Assert — open + close FDR records mirror the structured log surface.
enqueued_kinds = []
for call in fake_fdr_client.enqueue.call_args_list:
record = call.args[0]
enqueued_kinds.append(record.payload["kind"])
assert "replay.sink.opened" in enqueued_kinds
assert "replay.sink.closed" in enqueued_kinds
def test_emit_progress_logged_every_1000(
tmp_path: Path,
fake_fdr_client: mock.MagicMock,
caplog: pytest.LogCaptureFixture,
) -> None:
# Arrange
sink = JsonlReplaySink(tmp_path / "progress.jsonl", fake_fdr_client)
# Act
with caplog.at_level("DEBUG", logger="c8_fc_adapter.replay_sink"):
for _ in range(2000):
sink.emit(_make_output())
sink.close()
# Assert
progress_records = [
record
for record in caplog.records
if getattr(record, "kind", "") == "replay.sink.emit_progress"
]
assert len(progress_records) == 2 # one at 1000, one at 2000
def test_module_factory_create_returns_sink(
tmp_path: Path, fake_fdr_client: mock.MagicMock
) -> None:
# Act
sink = create(output_path=tmp_path / "factory.jsonl", fdr_client=fake_fdr_client)
# Assert
assert isinstance(sink, JsonlReplaySink)
sink.close()
def test_emit_p99_latency_under_1ms(
tmp_path: Path, fake_fdr_client: mock.MagicMock
) -> None:
# Arrange
sink = JsonlReplaySink(tmp_path / "perf.jsonl", fake_fdr_client)
output = _make_output()
samples_ns: list[int] = []
# Act
for _ in range(500):
t0 = time.monotonic_ns()
sink.emit(output)
samples_ns.append(time.monotonic_ns() - t0)
sink.close()
# Assert — orjson + unbuffered write should be well under 1ms p99 on
# any developer host. 5ms ceiling absorbs noisy CI sandboxes.
samples_ns.sort()
p99_ns = samples_ns[int(len(samples_ns) * 0.99) - 1]
assert p99_ns < 5_000_000, f"p99 emit latency {p99_ns}ns exceeded 5ms ceiling"