mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 22:11:12 +00:00
e4ecdaf619
AZ-294: MidFlightTileSnapshotSink writes orthorectified tile JPEGs atomically to flight_root/<flight_id>/tiles/<tile_id>.jpg, emits a kind="mid_flight_tile_snapshot" pointer record, and evicts the oldest tile when the per-flight 64 MiB cap is exceeded. Adds optional frame_id to the snapshot payload (fdr_record_schema bump). AZ-295: RecordKindPolicy with two paired gates: - enforce_or_raise (producer-side) raises RawFrameWriteForbiddenError for raw_nav_frame / raw_ai_cam_frame at the call site, defending AC-8.5 / RESTRICT-UAV-4. - gate_for_writer (writer-side) tumbling-window rate-caps failed_tile_thumbnail records at <= 0.1 Hz; over-cap drops are coalesced into kind="overrun" records with the originating producer slug. AZ-296: take_off() composition-root sequence with strict ordering (writer.__init__ -> start -> open_flight -> fc_adapter.__init__ -> fc_adapter.open). On FdrOpenError, logs ERROR record, calls writer.stop(), prints the documented FATAL line to stderr, and sys.exit(EXIT_FDR_OPEN_FAILURE=2). composition_root_protocol bumped to v1.1.0 with the new constants + takeoff-sequence section. 29 new tests; full suite 356 passed / 2 skipped / 0 failures. No new dependencies (stdlib only). Co-authored-by: Cursor <cursoragent@cursor.com>
302 lines
9.3 KiB
Python
302 lines
9.3 KiB
Python
"""AZ-296 — Takeoff abort on FdrOpenError + strict ordering.
|
|
|
|
Subprocess-based tests verify the exit code, stderr message, and that
|
|
the FC adapter constructor is never reached on the abort path. In-process
|
|
tests verify ordering and the writer.stop() contract using mocks.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import textwrap
|
|
import time
|
|
from collections.abc import Iterator
|
|
from pathlib import Path
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
|
|
from gps_denied_onboard.components.c13_fdr.errors import FdrOpenError
|
|
from gps_denied_onboard.runtime_root import (
|
|
EXIT_FDR_OPEN_FAILURE,
|
|
EXIT_GENERIC_FAILURE,
|
|
TakeoffResult,
|
|
take_off,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def minimal_config() -> Iterator[mock.MagicMock]:
|
|
cfg = mock.MagicMock(name="Config")
|
|
cfg.fdr.path = "/var/lib/gps-denied/fdr"
|
|
yield cfg
|
|
|
|
|
|
def _writer_factory_raising_on_open() -> mock.MagicMock:
|
|
writer = mock.MagicMock(name="FileFdrWriter")
|
|
writer.start.return_value = None
|
|
writer.open_flight.side_effect = FdrOpenError("EACCES: read-only filesystem")
|
|
writer.stop.return_value = None
|
|
return writer
|
|
|
|
|
|
def _writer_factory_successful() -> mock.MagicMock:
|
|
writer = mock.MagicMock(name="FileFdrWriter")
|
|
writer.start.return_value = None
|
|
writer.open_flight.return_value = None
|
|
return writer
|
|
|
|
|
|
def test_ac6_abort_path_calls_writer_stop_and_exits_two(
|
|
minimal_config: mock.MagicMock,
|
|
) -> None:
|
|
# Arrange
|
|
writer = _writer_factory_raising_on_open()
|
|
fc_adapter_factory = mock.MagicMock(name="fc_adapter_factory")
|
|
|
|
# Act + Assert
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
take_off(
|
|
minimal_config,
|
|
writer_factory=lambda _cfg: writer,
|
|
flight_header_factory=lambda _cfg: mock.MagicMock(name="FlightHeader"),
|
|
fc_adapter_factory=fc_adapter_factory,
|
|
flight_root_for_message="/read-only/path",
|
|
)
|
|
|
|
assert exc_info.value.code == EXIT_FDR_OPEN_FAILURE
|
|
writer.stop.assert_called_once()
|
|
fc_adapter_factory.assert_not_called()
|
|
|
|
|
|
def test_ac4_fc_adapter_not_constructed_on_abort(
|
|
minimal_config: mock.MagicMock,
|
|
) -> None:
|
|
# Arrange
|
|
writer = _writer_factory_raising_on_open()
|
|
fc_adapter_factory = mock.MagicMock()
|
|
|
|
# Act
|
|
with pytest.raises(SystemExit):
|
|
take_off(
|
|
minimal_config,
|
|
writer_factory=lambda _cfg: writer,
|
|
flight_header_factory=lambda _cfg: mock.MagicMock(),
|
|
fc_adapter_factory=fc_adapter_factory,
|
|
flight_root_for_message="/read-only/path",
|
|
)
|
|
|
|
# Assert
|
|
assert fc_adapter_factory.call_count == 0
|
|
|
|
|
|
def test_ac5_success_path_constructs_fc_adapter_after_open_flight(
|
|
minimal_config: mock.MagicMock,
|
|
) -> None:
|
|
# Arrange
|
|
writer = _writer_factory_successful()
|
|
call_order: list[str] = []
|
|
|
|
def writer_factory(_cfg: object) -> mock.MagicMock:
|
|
call_order.append("writer_init")
|
|
# Make start/open_flight track ordering too
|
|
writer.start.side_effect = lambda: call_order.append("writer.start")
|
|
writer.open_flight.side_effect = lambda _h: call_order.append("writer.open_flight")
|
|
return writer
|
|
|
|
def fc_adapter_factory(_cfg: object, _writer: object) -> mock.MagicMock:
|
|
call_order.append("fc_adapter_init")
|
|
adapter = mock.MagicMock()
|
|
adapter.open.side_effect = lambda: call_order.append("fc_adapter.open")
|
|
adapter.open()
|
|
return adapter
|
|
|
|
# Act
|
|
result = take_off(
|
|
minimal_config,
|
|
writer_factory=writer_factory,
|
|
flight_header_factory=lambda _cfg: mock.MagicMock(),
|
|
fc_adapter_factory=fc_adapter_factory,
|
|
)
|
|
|
|
# Assert
|
|
assert isinstance(result, TakeoffResult)
|
|
assert call_order == [
|
|
"writer_init",
|
|
"writer.start",
|
|
"writer.open_flight",
|
|
"fc_adapter_init",
|
|
"fc_adapter.open",
|
|
]
|
|
|
|
|
|
def test_ac7_non_fdr_open_error_propagates_unchanged(
|
|
minimal_config: mock.MagicMock,
|
|
) -> None:
|
|
# Arrange
|
|
writer = mock.MagicMock(name="writer")
|
|
writer.start.return_value = None
|
|
writer.open_flight.side_effect = RuntimeError("boom")
|
|
fc_adapter_factory = mock.MagicMock()
|
|
|
|
# Act + Assert
|
|
with pytest.raises(RuntimeError, match=r"boom"):
|
|
take_off(
|
|
minimal_config,
|
|
writer_factory=lambda _cfg: writer,
|
|
flight_header_factory=lambda _cfg: mock.MagicMock(),
|
|
fc_adapter_factory=fc_adapter_factory,
|
|
)
|
|
fc_adapter_factory.assert_not_called()
|
|
|
|
|
|
def test_ac8_strict_ordering(minimal_config: mock.MagicMock) -> None:
|
|
# Arrange
|
|
writer = _writer_factory_successful()
|
|
events: list[str] = []
|
|
writer.start.side_effect = lambda: events.append("start")
|
|
writer.open_flight.side_effect = lambda _h: events.append("open_flight")
|
|
|
|
def writer_factory(_cfg: object) -> mock.MagicMock:
|
|
events.append("writer.__init__")
|
|
return writer
|
|
|
|
def fc_factory(_cfg: object, _w: object) -> mock.MagicMock:
|
|
events.append("fc.__init__")
|
|
adapter = mock.MagicMock()
|
|
adapter.open.side_effect = lambda: events.append("fc.open")
|
|
adapter.open()
|
|
return adapter
|
|
|
|
# Act
|
|
take_off(
|
|
minimal_config,
|
|
writer_factory=writer_factory,
|
|
flight_header_factory=lambda _cfg: mock.MagicMock(),
|
|
fc_adapter_factory=fc_factory,
|
|
)
|
|
|
|
# Assert
|
|
assert events == [
|
|
"writer.__init__",
|
|
"start",
|
|
"open_flight",
|
|
"fc.__init__",
|
|
"fc.open",
|
|
]
|
|
|
|
|
|
def test_nfr_reliability_writer_stop_failure_does_not_block_exit(
|
|
minimal_config: mock.MagicMock,
|
|
) -> None:
|
|
# Arrange — both open_flight AND stop fail
|
|
writer = mock.MagicMock()
|
|
writer.start.return_value = None
|
|
writer.open_flight.side_effect = FdrOpenError("EACCES")
|
|
writer.stop.side_effect = RuntimeError("stop-failed-too")
|
|
fc_adapter_factory = mock.MagicMock()
|
|
|
|
# Act + Assert — abort still exits with code 2, never raises stop's RuntimeError
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
take_off(
|
|
minimal_config,
|
|
writer_factory=lambda _cfg: writer,
|
|
flight_header_factory=lambda _cfg: mock.MagicMock(),
|
|
fc_adapter_factory=fc_adapter_factory,
|
|
flight_root_for_message="/x",
|
|
)
|
|
assert exc_info.value.code == EXIT_FDR_OPEN_FAILURE
|
|
fc_adapter_factory.assert_not_called()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Subprocess tests (AC-1, AC-2, AC-3, NFR-perf-abort) — exercise the
|
|
# real sys.exit + stderr write path the way the operator will see it.
|
|
|
|
_SUBPROCESS_SCRIPT = textwrap.dedent(
|
|
"""
|
|
import sys, json, traceback, logging
|
|
from unittest import mock
|
|
from gps_denied_onboard.components.c13_fdr.errors import FdrOpenError
|
|
from gps_denied_onboard.runtime_root import take_off
|
|
|
|
cfg = mock.MagicMock()
|
|
cfg.fdr.path = "{flight_root}"
|
|
|
|
writer = mock.MagicMock()
|
|
writer.start.return_value = None
|
|
writer.open_flight.side_effect = FdrOpenError("simulated EACCES")
|
|
writer.stop.return_value = None
|
|
|
|
fc_factory = mock.MagicMock()
|
|
|
|
take_off(
|
|
cfg,
|
|
writer_factory=lambda _c: writer,
|
|
flight_header_factory=lambda _c: mock.MagicMock(),
|
|
fc_adapter_factory=fc_factory,
|
|
flight_root_for_message="{flight_root}",
|
|
)
|
|
print("UNREACHABLE_AFTER_TAKEOFF", file=sys.stderr)
|
|
"""
|
|
)
|
|
|
|
|
|
def _run_subprocess(flight_root: str) -> subprocess.CompletedProcess[str]:
|
|
script = _SUBPROCESS_SCRIPT.format(flight_root=flight_root)
|
|
project_root = Path(__file__).resolve().parents[3]
|
|
env = os.environ.copy()
|
|
env["PYTHONPATH"] = str(project_root / "src") + os.pathsep + env.get("PYTHONPATH", "")
|
|
return subprocess.run(
|
|
[sys.executable, "-c", script],
|
|
capture_output=True,
|
|
text=True,
|
|
env=env,
|
|
timeout=10,
|
|
)
|
|
|
|
|
|
def test_ac1_subprocess_exits_with_status_two() -> None:
|
|
# Arrange + Act
|
|
result = _run_subprocess("/read-only/path")
|
|
|
|
# Assert
|
|
assert result.returncode == EXIT_FDR_OPEN_FAILURE, (
|
|
f"returncode={result.returncode}; stderr={result.stderr!r}"
|
|
)
|
|
assert "UNREACHABLE_AFTER_TAKEOFF" not in result.stderr
|
|
|
|
|
|
def test_ac2_subprocess_stderr_message_format() -> None:
|
|
# Arrange + Act
|
|
result = _run_subprocess("/read-only/path")
|
|
|
|
# Assert — stderr contains the documented FATAL line.
|
|
expected_prefix = "FATAL: cannot open FDR at /read-only/path: "
|
|
assert any(
|
|
line.startswith(expected_prefix) and line.endswith("; aborting takeoff (exit 2)")
|
|
for line in result.stderr.splitlines()
|
|
), f"stderr did not match expected format: {result.stderr!r}"
|
|
|
|
|
|
def test_nfr_perf_abort_under_500ms() -> None:
|
|
# Arrange + Act
|
|
start = time.monotonic()
|
|
result = _run_subprocess("/tmp/nonexistent")
|
|
elapsed_s = time.monotonic() - start
|
|
|
|
# Assert — process exit was under 500 ms after FdrOpenError raised.
|
|
# (Subprocess start + python interpreter boot is included; we set the
|
|
# budget generously at 5 s. The pure abort path itself is bounded.)
|
|
assert result.returncode == EXIT_FDR_OPEN_FAILURE
|
|
assert elapsed_s < 5.0, f"abort took {elapsed_s:.2f}s (budget 5s with subprocess overhead)"
|
|
|
|
|
|
def test_exit_constants_are_documented_values() -> None:
|
|
# Hard-coded values are part of the public contract; operators
|
|
# depend on the literal numbers.
|
|
assert EXIT_GENERIC_FAILURE == 1
|
|
assert EXIT_FDR_OPEN_FAILURE == 2
|