mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 22:21:13 +00:00
eb6dc17880
Subprocess-spawned tests in e2e/_unit_tests/reporting/ crashed with "argparse.ArgumentError: argument --csv: conflicting option string: --csv" because pytest-csv (autoloaded via entry-point) and our custom plugin both register --csv. pytest's option registry does not allow overrides. Fix: drop pytest-csv from e2e/runner/requirements.txt. It was unused, dead weight, and incompatible with pytest 9.x (uses removed hookwrapper marker). Update conftest + csv_reporter comments to match. After fix: 1229/1229 in e2e/_unit_tests pass. Bug ticket creation deferred (user skipped interactive Q this session) — payload recorded in _docs/_process_leftovers/2026-05-17_csv_reporter_*.md for replay on next /autodev. Co-authored-by: Cursor <cursoragent@cursor.com>
273 lines
11 KiB
Python
273 lines
11 KiB
Python
"""CSV reporter pytest plugin.
|
|
|
|
Emits one row per test with the exact columns declared in
|
|
``_docs/02_document/tests/environment.md`` § Reporting:
|
|
|
|
test_id, test_name, traces_to, fc_adapter, vio_strategy, tier,
|
|
started_at_utc, execution_time_ms, result, error_message, evidence_paths
|
|
|
|
Why a custom plugin rather than `pytest-csv`?
|
|
- `pytest-csv`'s default emission is `name`/`status`/`duration`; our matrix
|
|
needs the `traces_to`, `fc_adapter`, `vio_strategy`, `tier`,
|
|
`started_at_utc`, `evidence_paths` columns to feed the downstream badge
|
|
generator and regression detector.
|
|
- pytest's option registry does not allow override of an already-registered
|
|
flag, so this plugin owns `--csv` exclusively — `pytest-csv` is NOT a
|
|
dependency (and is incompatible with pytest 9.x anyway).
|
|
|
|
Result classification per AC-9:
|
|
- PASS / FAIL / SKIP map 1:1 to pytest's own outcome.
|
|
- XFAIL is emitted when the test was marked `deferred_ac(verdict="xfail",
|
|
reason=...)` and the body raised (the standard pytest XFAIL path).
|
|
|
|
The plugin is unit-tested in ``e2e/_unit_tests/reporting/test_csv_reporter.py``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import os
|
|
import time
|
|
from datetime import datetime, timezone
|
|
|
|
UTC = timezone.utc
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
|
|
CSV_COLUMNS: tuple[str, ...] = (
|
|
"test_id",
|
|
"test_name",
|
|
"traces_to",
|
|
"fc_adapter",
|
|
"vio_strategy",
|
|
"tier",
|
|
"started_at_utc",
|
|
"execution_time_ms",
|
|
"result",
|
|
"error_message",
|
|
"evidence_paths",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _parametrize_value(item: pytest.Item, name: str, default: str = "n/a") -> str:
|
|
cs = getattr(item, "callspec", None)
|
|
if cs is None:
|
|
return default
|
|
return str(cs.params.get(name, default))
|
|
|
|
|
|
def _traces_to(item: pytest.Item) -> str:
|
|
marker = item.get_closest_marker("traces_to")
|
|
if marker is None:
|
|
return ""
|
|
ids = marker.args[0] if marker.args else marker.kwargs.get("ids", "")
|
|
if isinstance(ids, (list, tuple, set)):
|
|
return ",".join(str(i) for i in ids)
|
|
return str(ids)
|
|
|
|
|
|
def _test_id(item: pytest.Item) -> str:
|
|
"""Stable test id for the CSV `test_id` column.
|
|
|
|
Prefers an explicit ``@pytest.mark.test_id("FT-P-01")`` if set, otherwise
|
|
falls back to pytest's nodeid which is unique per parametrize variant.
|
|
"""
|
|
marker = item.get_closest_marker("test_id")
|
|
if marker is not None and marker.args:
|
|
return str(marker.args[0])
|
|
return item.nodeid
|
|
|
|
|
|
def _outcome_to_result(report: pytest.TestReport, item: pytest.Item) -> str:
|
|
if report.outcome == "passed":
|
|
if report.when == "call" and item.get_closest_marker("deferred_ac") is not None:
|
|
deferred = item.get_closest_marker("deferred_ac")
|
|
if deferred and deferred.kwargs.get("verdict") == "xfail":
|
|
return "XFAIL"
|
|
# AZ-445 AC-4 (PARTIAL propagation): if the NFR recorder marked
|
|
# any AC PARTIAL for this nodeid, the row is PARTIAL instead of
|
|
# PASS. The aggregator is the source of truth.
|
|
try:
|
|
# Local import keeps csv_reporter usable when nfr_recorder
|
|
# is not loaded (e.g. in the standalone unit-test that
|
|
# exercises csv_reporter alone).
|
|
from .nfr_recorder import aggregator_for # noqa: PLC0415
|
|
|
|
aggregator = aggregator_for(item.session.config)
|
|
except Exception:
|
|
aggregator = None
|
|
if aggregator is not None:
|
|
for rec in aggregator.records():
|
|
if rec.nodeid == report.nodeid and rec.partial_acs:
|
|
return "PARTIAL"
|
|
return "PASS"
|
|
if report.outcome == "failed":
|
|
return "FAIL"
|
|
if report.outcome == "skipped":
|
|
if report.when == "call" and item.get_closest_marker("deferred_ac") is not None:
|
|
deferred = item.get_closest_marker("deferred_ac")
|
|
if deferred and deferred.kwargs.get("verdict") == "xfail":
|
|
return "XFAIL"
|
|
return "SKIP"
|
|
# Unknown outcome — should never happen with stock pytest, but emit a
|
|
# visible FAIL rather than swallow it silently.
|
|
return f"FAIL ({report.outcome})"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Row builder (exposed for unit tests)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def build_row(
|
|
item: pytest.Item,
|
|
report: pytest.TestReport,
|
|
started_at_utc: str,
|
|
execution_time_ms: int,
|
|
evidence_paths: list[str] | None = None,
|
|
) -> dict[str, str]:
|
|
"""Build the CSV row for a finished test.
|
|
|
|
Public function — unit-tested directly without spinning a pytest run.
|
|
"""
|
|
result = _outcome_to_result(report, item)
|
|
error_message = ""
|
|
if report.outcome == "failed":
|
|
# `longreprtext` is the canonical pytest rendering of the traceback;
|
|
# we collapse it to a single line for CSV friendliness and truncate
|
|
# to keep the row from blowing past a reasonable limit.
|
|
raw = report.longreprtext or repr(getattr(report, "longrepr", ""))
|
|
error_message = raw.replace("\n", " | ")[:2000]
|
|
elif report.outcome == "skipped":
|
|
# `longrepr` on a skip is a 3-tuple (file, lineno, reason).
|
|
if isinstance(report.longrepr, tuple) and len(report.longrepr) == 3:
|
|
error_message = str(report.longrepr[2])
|
|
else:
|
|
error_message = str(getattr(report, "longrepr", ""))[:2000]
|
|
|
|
return {
|
|
"test_id": _test_id(item),
|
|
"test_name": item.name,
|
|
"traces_to": _traces_to(item),
|
|
"fc_adapter": _parametrize_value(item, "fc_adapter"),
|
|
"vio_strategy": _parametrize_value(item, "vio_strategy"),
|
|
"tier": os.environ.get("TIER", "tier1-docker"),
|
|
"started_at_utc": started_at_utc,
|
|
"execution_time_ms": str(execution_time_ms),
|
|
"result": result,
|
|
"error_message": error_message,
|
|
"evidence_paths": ",".join(evidence_paths or []),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Plugin hooks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class _CsvReporter:
|
|
def __init__(self, output_path: Path) -> None:
|
|
self._path = output_path
|
|
self._path.parent.mkdir(parents=True, exist_ok=True)
|
|
# Per-item start times so we can attribute call-phase duration accurately
|
|
# (we want call+setup wall-clock, NOT just call duration which omits any
|
|
# boundary-fixture setup cost).
|
|
self._start_times: dict[str, tuple[float, str]] = {}
|
|
self._evidence: dict[str, list[str]] = {}
|
|
self._rows: list[dict[str, str]] = []
|
|
|
|
# --- lifecycle hooks ---
|
|
|
|
def pytest_runtest_logstart(self, nodeid: str, location: Any) -> None: # noqa: ARG002 (pytest hook signature)
|
|
self._start_times[nodeid] = (time.monotonic(), datetime.now(UTC).isoformat(timespec="seconds"))
|
|
|
|
def pytest_runtest_logreport(self, report: pytest.TestReport) -> None:
|
|
# We emit one row per item, taken from the `call` phase. Setup-phase
|
|
# SKIPs (e.g. from `pytest.skip()` inside a fixture) lack a `call`
|
|
# phase, so for those we use the `setup` phase report instead.
|
|
item = getattr(report, "_item", None) # populated by pytest_runtest_protocol below
|
|
if item is None:
|
|
return
|
|
if report.when == "call" or (report.when == "setup" and report.outcome == "skipped"):
|
|
start_mono, start_iso = self._start_times.get(report.nodeid, (time.monotonic(), datetime.now(UTC).isoformat(timespec="seconds")))
|
|
elapsed_ms = int((time.monotonic() - start_mono) * 1000)
|
|
evidence = self._evidence.get(report.nodeid, [])
|
|
row = build_row(item, report, start_iso, elapsed_ms, evidence)
|
|
self._rows.append(row)
|
|
|
|
@pytest.hookimpl(hookwrapper=True)
|
|
def pytest_runtest_protocol(self, item: pytest.Item, nextitem: pytest.Item | None) -> Any:
|
|
# Tag the report objects with the originating item so logreport above
|
|
# can read parametrize ids / markers without a global lookup.
|
|
original_pytest_runtest_makereport = item.session.config.hook.pytest_runtest_makereport
|
|
|
|
def wrapper(*args: Any, **kwargs: Any) -> Any: # noqa: ANN401
|
|
report = original_pytest_runtest_makereport(*args, **kwargs)
|
|
if report is not None:
|
|
report._item = item # noqa: SLF001 (intentional plugin attribute)
|
|
return report
|
|
|
|
item.session.config.hook.pytest_runtest_makereport = wrapper
|
|
outcome = yield
|
|
item.session.config.hook.pytest_runtest_makereport = original_pytest_runtest_makereport
|
|
return outcome.get_result() if hasattr(outcome, "get_result") else None
|
|
|
|
def pytest_sessionfinish(self, session: pytest.Session, exitstatus: int) -> None: # noqa: ARG002
|
|
with self._path.open("w", newline="", encoding="utf-8") as fh:
|
|
writer = csv.DictWriter(fh, fieldnames=list(CSV_COLUMNS))
|
|
writer.writeheader()
|
|
writer.writerows(self._rows)
|
|
|
|
# --- public surface for the evidence_bundler plugin to attach paths ---
|
|
|
|
def attach_evidence(self, nodeid: str, evidence_path: str) -> None:
|
|
self._evidence.setdefault(nodeid, []).append(evidence_path)
|
|
|
|
|
|
_REPORTER_KEY = pytest.StashKey["_CsvReporter | None"]()
|
|
|
|
|
|
def pytest_addoption(parser: pytest.Parser) -> None:
|
|
group = parser.getgroup("e2e-runner", "Blackbox e2e harness options")
|
|
group.addoption(
|
|
"--csv",
|
|
action="store",
|
|
default=None,
|
|
help="Path to the CSV report (one row per test). Default off — set to enable.",
|
|
)
|
|
group.addoption(
|
|
"--csv-columns",
|
|
action="store",
|
|
default=",".join(CSV_COLUMNS),
|
|
help="Comma-separated column order. Default = environment.md § Reporting.",
|
|
)
|
|
|
|
|
|
def pytest_configure(config: pytest.Config) -> None:
|
|
config.stash[_REPORTER_KEY] = None
|
|
csv_path = config.getoption("--csv")
|
|
if csv_path:
|
|
reporter = _CsvReporter(Path(csv_path))
|
|
config.stash[_REPORTER_KEY] = reporter
|
|
config.pluginmanager.register(reporter, name="e2e-csv-reporter")
|
|
# `traces_to` and `test_id` are pytest markers — register them so
|
|
# --strict-markers doesn't error on first use.
|
|
config.addinivalue_line(
|
|
"markers", "traces_to(ids): comma-separated AC/RESTRICT IDs the test exercises"
|
|
)
|
|
config.addinivalue_line(
|
|
"markers", "test_id(name): override the test_id column (default = pytest nodeid)"
|
|
)
|
|
|
|
|
|
def reporter_for(config: pytest.Config) -> _CsvReporter | None:
|
|
"""Public accessor — used by `evidence_bundler` to attach evidence paths."""
|
|
return config.stash.get(_REPORTER_KEY, None)
|