"""CSV reporter pytest plugin. Emits one row per test with the exact columns declared in ``_docs/02_document/tests/environment.md`` § Reporting: test_id, test_name, traces_to, fc_adapter, vio_strategy, tier, started_at_utc, execution_time_ms, result, error_message, evidence_paths Why a custom plugin rather than `pytest-csv` defaults? - `pytest-csv` is dependency-installed for its column-extension hooks, but its default emission is `name`/`status`/`duration` — our matrix needs the `traces_to`, `fc_adapter`, `vio_strategy`, `tier`, `started_at_utc`, `evidence_paths` columns to feed the downstream badge generator and regression detector. Result classification per AC-9: - PASS / FAIL / SKIP map 1:1 to pytest's own outcome. - XFAIL is emitted when the test was marked `deferred_ac(verdict="xfail", reason=...)` and the body raised (the standard pytest XFAIL path). The plugin is unit-tested in ``e2e/_unit_tests/reporting/test_csv_reporter.py``. """ from __future__ import annotations import csv import os import time from datetime import datetime, timezone UTC = timezone.utc from pathlib import Path from typing import Any import pytest CSV_COLUMNS: tuple[str, ...] = ( "test_id", "test_name", "traces_to", "fc_adapter", "vio_strategy", "tier", "started_at_utc", "execution_time_ms", "result", "error_message", "evidence_paths", ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _parametrize_value(item: pytest.Item, name: str, default: str = "n/a") -> str: cs = getattr(item, "callspec", None) if cs is None: return default return str(cs.params.get(name, default)) def _traces_to(item: pytest.Item) -> str: marker = item.get_closest_marker("traces_to") if marker is None: return "" ids = marker.args[0] if marker.args else marker.kwargs.get("ids", "") if isinstance(ids, (list, tuple, set)): return ",".join(str(i) for i in ids) return str(ids) def _test_id(item: pytest.Item) -> str: """Stable test id for the CSV `test_id` column. Prefers an explicit ``@pytest.mark.test_id("FT-P-01")`` if set, otherwise falls back to pytest's nodeid which is unique per parametrize variant. """ marker = item.get_closest_marker("test_id") if marker is not None and marker.args: return str(marker.args[0]) return item.nodeid def _outcome_to_result(report: pytest.TestReport, item: pytest.Item) -> str: if report.outcome == "passed": if report.when == "call" and item.get_closest_marker("deferred_ac") is not None: deferred = item.get_closest_marker("deferred_ac") if deferred and deferred.kwargs.get("verdict") == "xfail": return "XFAIL" # AZ-445 AC-4 (PARTIAL propagation): if the NFR recorder marked # any AC PARTIAL for this nodeid, the row is PARTIAL instead of # PASS. The aggregator is the source of truth. try: # Local import keeps csv_reporter usable when nfr_recorder # is not loaded (e.g. in the standalone unit-test that # exercises csv_reporter alone). from .nfr_recorder import aggregator_for # noqa: PLC0415 aggregator = aggregator_for(item.session.config) except Exception: aggregator = None if aggregator is not None: for rec in aggregator.records(): if rec.nodeid == report.nodeid and rec.partial_acs: return "PARTIAL" return "PASS" if report.outcome == "failed": return "FAIL" if report.outcome == "skipped": if report.when == "call" and item.get_closest_marker("deferred_ac") is not None: deferred = item.get_closest_marker("deferred_ac") if deferred and deferred.kwargs.get("verdict") == "xfail": return "XFAIL" return "SKIP" # Unknown outcome — should never happen with stock pytest, but emit a # visible FAIL rather than swallow it silently. return f"FAIL ({report.outcome})" # --------------------------------------------------------------------------- # Row builder (exposed for unit tests) # --------------------------------------------------------------------------- def build_row( item: pytest.Item, report: pytest.TestReport, started_at_utc: str, execution_time_ms: int, evidence_paths: list[str] | None = None, ) -> dict[str, str]: """Build the CSV row for a finished test. Public function — unit-tested directly without spinning a pytest run. """ result = _outcome_to_result(report, item) error_message = "" if report.outcome == "failed": # `longreprtext` is the canonical pytest rendering of the traceback; # we collapse it to a single line for CSV friendliness and truncate # to keep the row from blowing past a reasonable limit. raw = report.longreprtext or repr(getattr(report, "longrepr", "")) error_message = raw.replace("\n", " | ")[:2000] elif report.outcome == "skipped": # `longrepr` on a skip is a 3-tuple (file, lineno, reason). if isinstance(report.longrepr, tuple) and len(report.longrepr) == 3: error_message = str(report.longrepr[2]) else: error_message = str(getattr(report, "longrepr", ""))[:2000] return { "test_id": _test_id(item), "test_name": item.name, "traces_to": _traces_to(item), "fc_adapter": _parametrize_value(item, "fc_adapter"), "vio_strategy": _parametrize_value(item, "vio_strategy"), "tier": os.environ.get("TIER", "tier1-docker"), "started_at_utc": started_at_utc, "execution_time_ms": str(execution_time_ms), "result": result, "error_message": error_message, "evidence_paths": ",".join(evidence_paths or []), } # --------------------------------------------------------------------------- # Plugin hooks # --------------------------------------------------------------------------- class _CsvReporter: def __init__(self, output_path: Path) -> None: self._path = output_path self._path.parent.mkdir(parents=True, exist_ok=True) # Per-item start times so we can attribute call-phase duration accurately # (we want call+setup wall-clock, NOT just call duration which omits any # boundary-fixture setup cost). self._start_times: dict[str, tuple[float, str]] = {} self._evidence: dict[str, list[str]] = {} self._rows: list[dict[str, str]] = [] # --- lifecycle hooks --- def pytest_runtest_logstart(self, nodeid: str, location: Any) -> None: # noqa: ARG002 (pytest hook signature) self._start_times[nodeid] = (time.monotonic(), datetime.now(UTC).isoformat(timespec="seconds")) def pytest_runtest_logreport(self, report: pytest.TestReport) -> None: # We emit one row per item, taken from the `call` phase. Setup-phase # SKIPs (e.g. from `pytest.skip()` inside a fixture) lack a `call` # phase, so for those we use the `setup` phase report instead. item = getattr(report, "_item", None) # populated by pytest_runtest_protocol below if item is None: return if report.when == "call" or (report.when == "setup" and report.outcome == "skipped"): start_mono, start_iso = self._start_times.get(report.nodeid, (time.monotonic(), datetime.now(UTC).isoformat(timespec="seconds"))) elapsed_ms = int((time.monotonic() - start_mono) * 1000) evidence = self._evidence.get(report.nodeid, []) row = build_row(item, report, start_iso, elapsed_ms, evidence) self._rows.append(row) @pytest.hookimpl(hookwrapper=True) def pytest_runtest_protocol(self, item: pytest.Item, nextitem: pytest.Item | None) -> Any: # Tag the report objects with the originating item so logreport above # can read parametrize ids / markers without a global lookup. original_pytest_runtest_makereport = item.session.config.hook.pytest_runtest_makereport def wrapper(*args: Any, **kwargs: Any) -> Any: # noqa: ANN401 report = original_pytest_runtest_makereport(*args, **kwargs) if report is not None: report._item = item # noqa: SLF001 (intentional plugin attribute) return report item.session.config.hook.pytest_runtest_makereport = wrapper outcome = yield item.session.config.hook.pytest_runtest_makereport = original_pytest_runtest_makereport return outcome.get_result() if hasattr(outcome, "get_result") else None def pytest_sessionfinish(self, session: pytest.Session, exitstatus: int) -> None: # noqa: ARG002 with self._path.open("w", newline="", encoding="utf-8") as fh: writer = csv.DictWriter(fh, fieldnames=list(CSV_COLUMNS)) writer.writeheader() writer.writerows(self._rows) # --- public surface for the evidence_bundler plugin to attach paths --- def attach_evidence(self, nodeid: str, evidence_path: str) -> None: self._evidence.setdefault(nodeid, []).append(evidence_path) _REPORTER_KEY = pytest.StashKey["_CsvReporter | None"]() def pytest_addoption(parser: pytest.Parser) -> None: group = parser.getgroup("e2e-runner", "Blackbox e2e harness options") group.addoption( "--csv", action="store", default=None, help="Path to the CSV report (one row per test). Default off — set to enable.", ) group.addoption( "--csv-columns", action="store", default=",".join(CSV_COLUMNS), help="Comma-separated column order. Default = environment.md § Reporting.", ) def pytest_configure(config: pytest.Config) -> None: config.stash[_REPORTER_KEY] = None csv_path = config.getoption("--csv") if csv_path: reporter = _CsvReporter(Path(csv_path)) config.stash[_REPORTER_KEY] = reporter config.pluginmanager.register(reporter, name="e2e-csv-reporter") # `traces_to` and `test_id` are pytest markers — register them so # --strict-markers doesn't error on first use. config.addinivalue_line( "markers", "traces_to(ids): comma-separated AC/RESTRICT IDs the test exercises" ) config.addinivalue_line( "markers", "test_id(name): override the test_id column (default = pytest nodeid)" ) def reporter_for(config: pytest.Config) -> _CsvReporter | None: """Public accessor — used by `evidence_bundler` to attach evidence paths.""" return config.stash.get(_REPORTER_KEY, None)