mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 18:21:13 +00:00
6599d828d2
Three blackbox-harness tasks landed together — all depend only on
AZ-406 and unblock the FT-* / NFT-* scenario tasks scheduled for
batches 69+.
AZ-407 — Static fixture builders (3pt):
* tile-cache-builder/{builder.py, Dockerfile, build.sh} produces a
deterministic tile-cache-fixture Docker volume from
_docs/00_problem/input_data/. Reproducibility primitives: sorted
iteration, frozen PIL JPEG settings, FAISS HNSW32 built single-
threaded with seeded stub descriptors.
* age-injector/{age_injector.py, inject.sh} clones the volume and
shifts capture_date by N×30.44 days; tile JPEG bytes preserved
bit-identical. Emits synth-age-7mo + synth-age-13mo volumes.
* cold-boot/cold_boot_fixture.json: frozen FC pose snapshot at
Derkachi sector centre, schema v1.
* secrets/mavlink-test-passkey.txt: 64-hex with required
`# TEST ONLY` header line per AC-5. Passkey-equality test now
compares the secret line after stripping the header.
* security/cve-2025-53644.jpg: synthetic 158-byte malformed JPEG
(truncated SOS marker). OpenCV 4.11.x rejects gracefully with
imdecode → None. AZ-439 will sharpen for ASan instrumentation.
* Top-level Makefile with `make fixtures` / `make fixtures-*` /
`make e2e-tier1*` / `make unit-tests` targets.
AZ-444 — Tier-2 Jetson harness wrapper (5pt):
* run-tier2.sh rewritten as orchestrator. Detects local
(aarch64 + TIER2_HOST=localhost) vs remote (ssh into TIER2_HOST).
New flags: -k/--selector, --build-kind production|asan,
--reflash (gated behind TIER2_REFLASH_ACK=1 two-key gate),
--dry-run.
* tier2-on-jetson.sh (new) — on-device delegate. Verifies
gps-denied-onboard{,-asan}.service health; restarts with 5s
tolerance; spawns tegrastats + jtop parallel samplers; tails
ASan unit's journal in asan mode; drives docker compose with
TIER=tier2-jetson; forwards SELECTOR to pytest -k.
* docker/run-tier1.sh (new) — selector-parity sibling.
* AC-1 (selector parity) and AC-6 (reflash gating) unit-tested via
--dry-run output assertions. AC-2/AC-3/AC-4/AC-5 are hardware-
loop ACs verified by the Tier-2 runtime smoke (no Jetson in the
unit-test layer).
AZ-445 — CSV reporter + evidence bundler refinements (2pt):
* reporting/nfr_recorder.py (new) — pytest plugin. Provides the
`nfr_recorder` fixture with record_metric(name, value, ac_id)
and partial(ac_id, reason). At session end emits:
- per-nfr/<scenario_id>.json (AC-1)
- traceability-status.json with every AC ID parsed from
traceability-matrix.md, classified Covered/PARTIAL/NOT
COVERED with source scenario IDs (AC-2)
- regression-baseline.json with all numeric metrics (AC-3)
* csv_reporter.py extended — `_outcome_to_result` consults the
aggregator; rows flip PASS → PARTIAL when an AC was marked
PARTIAL by nfr_recorder (AC-4). Graceful fallback when
aggregator isn't registered (unit-test contexts).
* conftest.py registers nfr_recorder in pytest_plugins.
* New --traceability-matrix CLI flag seeds the NOT COVERED rows.
Build / config:
* pyproject.toml dev extras: added Pillow>=10.4,<13.0 for the
tile-cache-builder unit test (broad enough to keep torchvision's
Pillow 12 pin happy; the production builder runs inside its own
Docker image with its own pin).
* Updated test_directory_layout.py to cover 10 new files + replaced
the byte-equal passkey assertion with the header-stripping
variant.
Test results:
* 157 focused tests pass (was 97 in batch 67; +60 new across this
batch). No regressions.
Module-layout / spec drift:
* AZ-407 spec text says `tests/fixtures/...`; module-layout
blackbox_tests entry (commit d7a17a8) authoritatively places the
harness under `e2e/`. Implementation followed the layout entry.
* AZ-444 spec mentions `e2e/tier2/run-tier2.sh`; AZ-406 placed it
at `e2e/jetson/run-tier2.sh`. Kept at `e2e/jetson/` for
consistency.
* Cold-boot README ownership: corrected from AZ-419 to AZ-407 per
AZ-419's own Dependencies field.
Specs archived to _docs/02_tasks/done/. Jira tickets transitioned to
In Testing on commit.
Co-authored-by: Cursor <cursoragent@cursor.com>
271 lines
11 KiB
Python
271 lines
11 KiB
Python
"""CSV reporter pytest plugin.
|
|
|
|
Emits one row per test with the exact columns declared in
|
|
``_docs/02_document/tests/environment.md`` § Reporting:
|
|
|
|
test_id, test_name, traces_to, fc_adapter, vio_strategy, tier,
|
|
started_at_utc, execution_time_ms, result, error_message, evidence_paths
|
|
|
|
Why a custom plugin rather than `pytest-csv` defaults?
|
|
- `pytest-csv` is dependency-installed for its column-extension hooks, but
|
|
its default emission is `name`/`status`/`duration` — our matrix needs the
|
|
`traces_to`, `fc_adapter`, `vio_strategy`, `tier`, `started_at_utc`,
|
|
`evidence_paths` columns to feed the downstream badge generator and
|
|
regression detector.
|
|
|
|
Result classification per AC-9:
|
|
- PASS / FAIL / SKIP map 1:1 to pytest's own outcome.
|
|
- XFAIL is emitted when the test was marked `deferred_ac(verdict="xfail",
|
|
reason=...)` and the body raised (the standard pytest XFAIL path).
|
|
|
|
The plugin is unit-tested in ``e2e/_unit_tests/reporting/test_csv_reporter.py``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import os
|
|
import time
|
|
from datetime import datetime, timezone
|
|
|
|
UTC = timezone.utc
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
|
|
CSV_COLUMNS: tuple[str, ...] = (
|
|
"test_id",
|
|
"test_name",
|
|
"traces_to",
|
|
"fc_adapter",
|
|
"vio_strategy",
|
|
"tier",
|
|
"started_at_utc",
|
|
"execution_time_ms",
|
|
"result",
|
|
"error_message",
|
|
"evidence_paths",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _parametrize_value(item: pytest.Item, name: str, default: str = "n/a") -> str:
|
|
cs = getattr(item, "callspec", None)
|
|
if cs is None:
|
|
return default
|
|
return str(cs.params.get(name, default))
|
|
|
|
|
|
def _traces_to(item: pytest.Item) -> str:
|
|
marker = item.get_closest_marker("traces_to")
|
|
if marker is None:
|
|
return ""
|
|
ids = marker.args[0] if marker.args else marker.kwargs.get("ids", "")
|
|
if isinstance(ids, (list, tuple, set)):
|
|
return ",".join(str(i) for i in ids)
|
|
return str(ids)
|
|
|
|
|
|
def _test_id(item: pytest.Item) -> str:
|
|
"""Stable test id for the CSV `test_id` column.
|
|
|
|
Prefers an explicit ``@pytest.mark.test_id("FT-P-01")`` if set, otherwise
|
|
falls back to pytest's nodeid which is unique per parametrize variant.
|
|
"""
|
|
marker = item.get_closest_marker("test_id")
|
|
if marker is not None and marker.args:
|
|
return str(marker.args[0])
|
|
return item.nodeid
|
|
|
|
|
|
def _outcome_to_result(report: pytest.TestReport, item: pytest.Item) -> str:
|
|
if report.outcome == "passed":
|
|
if report.when == "call" and item.get_closest_marker("deferred_ac") is not None:
|
|
deferred = item.get_closest_marker("deferred_ac")
|
|
if deferred and deferred.kwargs.get("verdict") == "xfail":
|
|
return "XFAIL"
|
|
# AZ-445 AC-4 (PARTIAL propagation): if the NFR recorder marked
|
|
# any AC PARTIAL for this nodeid, the row is PARTIAL instead of
|
|
# PASS. The aggregator is the source of truth.
|
|
try:
|
|
# Local import keeps csv_reporter usable when nfr_recorder
|
|
# is not loaded (e.g. in the standalone unit-test that
|
|
# exercises csv_reporter alone).
|
|
from .nfr_recorder import aggregator_for # noqa: PLC0415
|
|
|
|
aggregator = aggregator_for(item.session.config)
|
|
except Exception:
|
|
aggregator = None
|
|
if aggregator is not None:
|
|
for rec in aggregator.records():
|
|
if rec.nodeid == report.nodeid and rec.partial_acs:
|
|
return "PARTIAL"
|
|
return "PASS"
|
|
if report.outcome == "failed":
|
|
return "FAIL"
|
|
if report.outcome == "skipped":
|
|
if report.when == "call" and item.get_closest_marker("deferred_ac") is not None:
|
|
deferred = item.get_closest_marker("deferred_ac")
|
|
if deferred and deferred.kwargs.get("verdict") == "xfail":
|
|
return "XFAIL"
|
|
return "SKIP"
|
|
# Unknown outcome — should never happen with stock pytest, but emit a
|
|
# visible FAIL rather than swallow it silently.
|
|
return f"FAIL ({report.outcome})"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Row builder (exposed for unit tests)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def build_row(
|
|
item: pytest.Item,
|
|
report: pytest.TestReport,
|
|
started_at_utc: str,
|
|
execution_time_ms: int,
|
|
evidence_paths: list[str] | None = None,
|
|
) -> dict[str, str]:
|
|
"""Build the CSV row for a finished test.
|
|
|
|
Public function — unit-tested directly without spinning a pytest run.
|
|
"""
|
|
result = _outcome_to_result(report, item)
|
|
error_message = ""
|
|
if report.outcome == "failed":
|
|
# `longreprtext` is the canonical pytest rendering of the traceback;
|
|
# we collapse it to a single line for CSV friendliness and truncate
|
|
# to keep the row from blowing past a reasonable limit.
|
|
raw = report.longreprtext or repr(getattr(report, "longrepr", ""))
|
|
error_message = raw.replace("\n", " | ")[:2000]
|
|
elif report.outcome == "skipped":
|
|
# `longrepr` on a skip is a 3-tuple (file, lineno, reason).
|
|
if isinstance(report.longrepr, tuple) and len(report.longrepr) == 3:
|
|
error_message = str(report.longrepr[2])
|
|
else:
|
|
error_message = str(getattr(report, "longrepr", ""))[:2000]
|
|
|
|
return {
|
|
"test_id": _test_id(item),
|
|
"test_name": item.name,
|
|
"traces_to": _traces_to(item),
|
|
"fc_adapter": _parametrize_value(item, "fc_adapter"),
|
|
"vio_strategy": _parametrize_value(item, "vio_strategy"),
|
|
"tier": os.environ.get("TIER", "tier1-docker"),
|
|
"started_at_utc": started_at_utc,
|
|
"execution_time_ms": str(execution_time_ms),
|
|
"result": result,
|
|
"error_message": error_message,
|
|
"evidence_paths": ",".join(evidence_paths or []),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin hooks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class _CsvReporter:
|
|
def __init__(self, output_path: Path) -> None:
|
|
self._path = output_path
|
|
self._path.parent.mkdir(parents=True, exist_ok=True)
|
|
# Per-item start times so we can attribute call-phase duration accurately
|
|
# (we want call+setup wall-clock, NOT just call duration which omits any
|
|
# boundary-fixture setup cost).
|
|
self._start_times: dict[str, tuple[float, str]] = {}
|
|
self._evidence: dict[str, list[str]] = {}
|
|
self._rows: list[dict[str, str]] = []
|
|
|
|
# --- lifecycle hooks ---
|
|
|
|
def pytest_runtest_logstart(self, nodeid: str, location: Any) -> None: # noqa: ARG002 (pytest hook signature)
|
|
self._start_times[nodeid] = (time.monotonic(), datetime.now(UTC).isoformat(timespec="seconds"))
|
|
|
|
def pytest_runtest_logreport(self, report: pytest.TestReport) -> None:
|
|
# We emit one row per item, taken from the `call` phase. Setup-phase
|
|
# SKIPs (e.g. from `pytest.skip()` inside a fixture) lack a `call`
|
|
# phase, so for those we use the `setup` phase report instead.
|
|
item = getattr(report, "_item", None) # populated by pytest_runtest_protocol below
|
|
if item is None:
|
|
return
|
|
if report.when == "call" or (report.when == "setup" and report.outcome == "skipped"):
|
|
start_mono, start_iso = self._start_times.get(report.nodeid, (time.monotonic(), datetime.now(UTC).isoformat(timespec="seconds")))
|
|
elapsed_ms = int((time.monotonic() - start_mono) * 1000)
|
|
evidence = self._evidence.get(report.nodeid, [])
|
|
row = build_row(item, report, start_iso, elapsed_ms, evidence)
|
|
self._rows.append(row)
|
|
|
|
@pytest.hookimpl(hookwrapper=True)
|
|
def pytest_runtest_protocol(self, item: pytest.Item, nextitem: pytest.Item | None) -> Any:
|
|
# Tag the report objects with the originating item so logreport above
|
|
# can read parametrize ids / markers without a global lookup.
|
|
original_pytest_runtest_makereport = item.session.config.hook.pytest_runtest_makereport
|
|
|
|
def wrapper(*args: Any, **kwargs: Any) -> Any: # noqa: ANN401
|
|
report = original_pytest_runtest_makereport(*args, **kwargs)
|
|
if report is not None:
|
|
report._item = item # noqa: SLF001 (intentional plugin attribute)
|
|
return report
|
|
|
|
item.session.config.hook.pytest_runtest_makereport = wrapper
|
|
outcome = yield
|
|
item.session.config.hook.pytest_runtest_makereport = original_pytest_runtest_makereport
|
|
return outcome.get_result() if hasattr(outcome, "get_result") else None
|
|
|
|
def pytest_sessionfinish(self, session: pytest.Session, exitstatus: int) -> None: # noqa: ARG002
|
|
with self._path.open("w", newline="", encoding="utf-8") as fh:
|
|
writer = csv.DictWriter(fh, fieldnames=list(CSV_COLUMNS))
|
|
writer.writeheader()
|
|
writer.writerows(self._rows)
|
|
|
|
# --- public surface for the evidence_bundler plugin to attach paths ---
|
|
|
|
def attach_evidence(self, nodeid: str, evidence_path: str) -> None:
|
|
self._evidence.setdefault(nodeid, []).append(evidence_path)
|
|
|
|
|
|
_REPORTER_KEY = pytest.StashKey["_CsvReporter | None"]()
|
|
|
|
|
|
def pytest_addoption(parser: pytest.Parser) -> None:
|
|
group = parser.getgroup("e2e-runner", "Blackbox e2e harness options")
|
|
group.addoption(
|
|
"--csv",
|
|
action="store",
|
|
default=None,
|
|
help="Path to the CSV report (one row per test). Default off — set to enable.",
|
|
)
|
|
group.addoption(
|
|
"--csv-columns",
|
|
action="store",
|
|
default=",".join(CSV_COLUMNS),
|
|
help="Comma-separated column order. Default = environment.md § Reporting.",
|
|
)
|
|
|
|
|
|
def pytest_configure(config: pytest.Config) -> None:
|
|
config.stash[_REPORTER_KEY] = None
|
|
csv_path = config.getoption("--csv")
|
|
if csv_path:
|
|
reporter = _CsvReporter(Path(csv_path))
|
|
config.stash[_REPORTER_KEY] = reporter
|
|
config.pluginmanager.register(reporter, name="e2e-csv-reporter")
|
|
# `traces_to` and `test_id` are pytest markers — register them so
|
|
# --strict-markers doesn't error on first use.
|
|
config.addinivalue_line(
|
|
"markers", "traces_to(ids): comma-separated AC/RESTRICT IDs the test exercises"
|
|
)
|
|
config.addinivalue_line(
|
|
"markers", "test_id(name): override the test_id column (default = pytest nodeid)"
|
|
)
|
|
|
|
|
|
def reporter_for(config: pytest.Config) -> _CsvReporter | None:
|
|
"""Public accessor — used by `evidence_bundler` to attach evidence paths."""
|
|
return config.stash.get(_REPORTER_KEY, None)
|