mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 15:01:13 +00:00
33e683dc0f
Batch 89 — adds optional `band`, `ci95_low`, `ci95_high` kw-only parameters to `_NfrRecorder.record_metric` and emits a new per-metric report.csv artifact (one row per scenario × metric, columns: scenario_id, metric_name, value, value_band, ci95_low, ci95_high, ac_id, outcome). Backwards compatible — existing 4-arg callers unchanged; unbalanced ci95 pair raises ValueError. report.csv is written once per pytest session from `pytest_sessionfinish` so the annotation pass runs once per CI invocation regardless of (fc_adapter, vio_strategy) (AC-3). `regression-baseline.json` intentionally kept flat to preserve the diff contract used by regression-detection tooling. NFT-RES-03 + NFT-PERF-01 scenarios updated to pass real bands and compute empirical 2.5/97.5-percentile ci95 from their own sample streams (per-iteration envelope ratios for Monte Carlo, per-frame latency samples for N-sample latency). Tests: 1229 e2e/_unit_tests pass (+6 vs. batch 88 for AZ-446 band/CI behavior, value-error on unbalanced ci95, report.csv columns, explicit-path override, and end-to-end emission via the pytest plugin). Code review: PASS_WITH_WARNINGS — 1 Low (empirical-CI semantics, documented inline), 1 Medium carried over from batch 88's cumulative-review backlog (write_csv_evidence + _resolve_fixture_path duplication is outside AZ-446 reporting scope). This commit closes Step 10 Implement Tests for cycle 1 (41 of 41 blackbox-test tasks done, AZ-406..AZ-446). Greenfield auto-chains to Step 11 Run Tests next. Co-authored-by: Cursor <cursoragent@cursor.com>
515 lines
18 KiB
Python
515 lines
18 KiB
Python
"""NFR metrics recorder + run-end aggregator (AZ-445).
|
||
|
||
Extends the AZ-406 reporting subsystem with three additional artifacts:
|
||
|
||
* ``per-nfr/<scenario_id>.json`` — canonical metric blob per NFT scenario.
|
||
* ``traceability-status.json`` — per-AC coverage roll-up across the run.
|
||
* ``regression-baseline.json`` — flat dump of every numeric metric the
|
||
run captured (diffable across runs).
|
||
|
||
Public API (used by NFT scenario tests):
|
||
|
||
def test_nft_perf_01_partition_latency_p95(nfr_recorder):
|
||
nfr_recorder.record_metric("latency_ms_p95", 380.4, ac_id="AC-4.1")
|
||
nfr_recorder.partial(
|
||
"AC-4.1",
|
||
"p95 exceeds 400 ms when chamber is enabled (deferred to NFT-PERF-01b)",
|
||
)
|
||
|
||
The recorder also exposes ``recorder.scenario_id`` for tests that need
|
||
to name their evidence files consistently with the per-NFR JSON.
|
||
|
||
PARTIAL propagation: ``recorder.partial(ac_id, reason)`` marks the
|
||
current test row as PARTIAL in the CSV reporter and the corresponding
|
||
AC as PARTIAL in the traceability roll-up. Tests that PASS without
|
||
calling ``partial`` are recorded as Covered.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import csv
|
||
import json
|
||
import logging
|
||
import re
|
||
from dataclasses import dataclass, field
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import pytest
|
||
|
||
from .csv_reporter import reporter_for
|
||
|
||
|
||
def _stringify(value: Any) -> str:
|
||
"""CSV cell projection — ``None`` → empty cell; floats keep precision."""
|
||
if value is None:
|
||
return ""
|
||
if isinstance(value, float):
|
||
return f"{value:.6g}"
|
||
return str(value)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# ───────────────────────── data model ─────────────────────────
|
||
|
||
|
||
@dataclass
|
||
class _ScenarioRecord:
|
||
scenario_id: str
|
||
nodeid: str
|
||
traces_to: tuple[str, ...]
|
||
metrics: dict[str, Any] = field(default_factory=dict)
|
||
partial_acs: dict[str, str] = field(default_factory=dict) # ac_id → reason
|
||
outcome: str | None = None # filled in at logreport time
|
||
|
||
|
||
# ───────────────────── traceability matrix parser ─────────────────────
|
||
|
||
|
||
_AC_ROW_RE = re.compile(r"^\|\s*(AC-[A-Za-z0-9\.-]+)\s*\|", re.MULTILINE)
|
||
_RESTRICT_ROW_RE = re.compile(
|
||
r"^\|\s*(RESTRICT-[A-Za-z0-9\.-]+)\s*\|", re.MULTILINE
|
||
)
|
||
|
||
|
||
def parse_traceability_matrix(matrix_path: Path) -> list[str]:
|
||
"""Extract every AC / RESTRICT ID declared in the matrix file.
|
||
|
||
Returns a sorted, deduplicated list. Public so unit tests can call
|
||
it independently of pytest.
|
||
"""
|
||
|
||
if not matrix_path.is_file():
|
||
raise FileNotFoundError(f"traceability matrix not found at {matrix_path}")
|
||
text = matrix_path.read_text()
|
||
ids: set[str] = set()
|
||
for match in _AC_ROW_RE.finditer(text):
|
||
ids.add(match.group(1))
|
||
for match in _RESTRICT_ROW_RE.finditer(text):
|
||
ids.add(match.group(1))
|
||
return sorted(ids)
|
||
|
||
|
||
# ───────────────────── recorder fixture ─────────────────────
|
||
|
||
|
||
class _NfrRecorder:
|
||
"""Per-test handle exposed via the ``nfr_recorder`` pytest fixture."""
|
||
|
||
def __init__(
|
||
self,
|
||
scenario_id: str,
|
||
nodeid: str,
|
||
traces_to: tuple[str, ...],
|
||
run: "_RunAggregator",
|
||
) -> None:
|
||
self.scenario_id = scenario_id
|
||
self.nodeid = nodeid
|
||
self.traces_to = traces_to
|
||
self._run = run
|
||
|
||
def record_metric(
|
||
self,
|
||
name: str,
|
||
value: Any,
|
||
ac_id: str | None = None,
|
||
*,
|
||
band: str | None = None,
|
||
ci95_low: float | None = None,
|
||
ci95_high: float | None = None,
|
||
) -> None:
|
||
"""Capture a numeric / structured metric for this scenario.
|
||
|
||
Optional kwargs (AZ-446):
|
||
|
||
* ``band`` — short human-readable AC threshold text (e.g.
|
||
``"≤400 ms"``). Surfaces as ``<name>_band`` in the per-metric
|
||
report.csv and as ``"band"`` in regression-baseline.json.
|
||
* ``ci95_low`` / ``ci95_high`` — 95% interval bounds for the
|
||
metric, used by Monte Carlo (NFT-RES-03) and N-sample
|
||
(NFT-PERF-01) scenarios. Both must be passed together or
|
||
both omitted; passing only one raises ``ValueError``.
|
||
"""
|
||
if not isinstance(name, str) or not name:
|
||
raise ValueError(f"metric name must be a non-empty str, got {name!r}")
|
||
if (ci95_low is None) != (ci95_high is None):
|
||
raise ValueError(
|
||
f"ci95_low and ci95_high must be provided together "
|
||
f"(got low={ci95_low!r}, high={ci95_high!r})"
|
||
)
|
||
self._run.record_metric(
|
||
scenario_id=self.scenario_id,
|
||
name=name,
|
||
value=value,
|
||
ac_id=ac_id,
|
||
nodeid=self.nodeid,
|
||
band=band,
|
||
ci95_low=ci95_low,
|
||
ci95_high=ci95_high,
|
||
)
|
||
|
||
def partial(self, ac_id: str, reason: str) -> None:
|
||
"""Mark `ac_id` PARTIAL for this scenario and propagate to CSV row."""
|
||
if not ac_id or not reason:
|
||
raise ValueError("partial() requires both ac_id and reason")
|
||
self._run.mark_partial(
|
||
scenario_id=self.scenario_id,
|
||
ac_id=ac_id,
|
||
reason=reason,
|
||
nodeid=self.nodeid,
|
||
)
|
||
|
||
|
||
# ───────────────────── run aggregator ─────────────────────
|
||
|
||
|
||
class _RunAggregator:
|
||
"""Plugin-scoped state for the whole pytest session."""
|
||
|
||
def __init__(
|
||
self,
|
||
evidence_dir: Path,
|
||
matrix_ids: list[str],
|
||
) -> None:
|
||
self.evidence_dir = evidence_dir
|
||
self.matrix_ids = matrix_ids
|
||
self._records: dict[str, _ScenarioRecord] = {}
|
||
|
||
# --- mutation API used by _NfrRecorder ---
|
||
|
||
def ensure_record(
|
||
self, scenario_id: str, nodeid: str, traces_to: tuple[str, ...]
|
||
) -> _ScenarioRecord:
|
||
rec = self._records.get(nodeid)
|
||
if rec is None:
|
||
rec = _ScenarioRecord(
|
||
scenario_id=scenario_id,
|
||
nodeid=nodeid,
|
||
traces_to=traces_to,
|
||
)
|
||
self._records[nodeid] = rec
|
||
return rec
|
||
|
||
def record_metric(
|
||
self,
|
||
*,
|
||
scenario_id: str,
|
||
name: str,
|
||
value: Any,
|
||
ac_id: str | None,
|
||
nodeid: str,
|
||
band: str | None = None,
|
||
ci95_low: float | None = None,
|
||
ci95_high: float | None = None,
|
||
) -> None:
|
||
rec = self._records[nodeid]
|
||
entry: dict[str, Any] = {"value": value, "ac_id": ac_id}
|
||
if band is not None:
|
||
entry["band"] = band
|
||
if ci95_low is not None:
|
||
entry["ci95_low"] = ci95_low
|
||
if ci95_high is not None:
|
||
entry["ci95_high"] = ci95_high
|
||
rec.metrics[name] = entry
|
||
|
||
def mark_partial(
|
||
self,
|
||
*,
|
||
scenario_id: str,
|
||
ac_id: str,
|
||
reason: str,
|
||
nodeid: str,
|
||
) -> None:
|
||
rec = self._records[nodeid]
|
||
rec.partial_acs[ac_id] = reason
|
||
|
||
def set_outcome(self, nodeid: str, outcome: str) -> None:
|
||
"""Called by the plugin's logreport hook."""
|
||
rec = self._records.get(nodeid)
|
||
if rec is not None:
|
||
rec.outcome = outcome
|
||
|
||
# --- read-only accessors used by tests + emission ---
|
||
|
||
def records(self) -> list[_ScenarioRecord]:
|
||
return list(self._records.values())
|
||
|
||
# --- emission (called at session end) ---
|
||
|
||
def emit_per_nfr_json(self) -> list[Path]:
|
||
"""One file per scenario under ``<evidence_dir>/per-nfr/``."""
|
||
out_dir = self.evidence_dir / "per-nfr"
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
emitted: list[Path] = []
|
||
for rec in self._records.values():
|
||
path = out_dir / f"{rec.scenario_id}.json"
|
||
blob = {
|
||
"scenario_id": rec.scenario_id,
|
||
"nodeid": rec.nodeid,
|
||
"traces_to": list(rec.traces_to),
|
||
"outcome": rec.outcome or "UNKNOWN",
|
||
"metrics": rec.metrics,
|
||
"partial_acs": rec.partial_acs,
|
||
}
|
||
path.write_text(json.dumps(blob, sort_keys=True, indent=2) + "\n")
|
||
emitted.append(path)
|
||
return emitted
|
||
|
||
def compute_traceability_status(self) -> dict:
|
||
"""Aggregate per-AC status across all recorded scenarios.
|
||
|
||
Algorithm:
|
||
* NOT COVERED — no scenario traces to this AC.
|
||
* PARTIAL — at least one scenario marks the AC PARTIAL
|
||
OR has outcome ∈ {FAIL, SKIP}.
|
||
* Covered — every tracing scenario has outcome ∈
|
||
{PASS, XFAIL} and none marked PARTIAL.
|
||
"""
|
||
by_ac: dict[str, dict] = {
|
||
ac: {"status": "NOT COVERED", "sources": []} for ac in self.matrix_ids
|
||
}
|
||
for rec in self._records.values():
|
||
for ac in rec.traces_to:
|
||
entry = by_ac.setdefault(ac, {"status": "NOT COVERED", "sources": []})
|
||
entry["sources"].append(rec.scenario_id)
|
||
outcome = (rec.outcome or "").upper()
|
||
if ac in rec.partial_acs:
|
||
entry["status"] = "PARTIAL"
|
||
elif outcome in {"FAIL", "SKIP"}:
|
||
# Worse than partial — still surface as PARTIAL per
|
||
# AZ-445 AC-2 (status enum is {Covered, PARTIAL, NOT COVERED}).
|
||
if entry["status"] != "PARTIAL":
|
||
entry["status"] = "PARTIAL"
|
||
elif outcome in {"PASS", "XFAIL"}:
|
||
# Promote NOT COVERED → Covered; keep PARTIAL pinned.
|
||
if entry["status"] == "NOT COVERED":
|
||
entry["status"] = "Covered"
|
||
# Unknown / missing outcomes stay as whatever they were
|
||
# — we don't downgrade a PARTIAL by an unknown.
|
||
|
||
# Make output deterministic: sort sources within each AC entry.
|
||
for entry in by_ac.values():
|
||
entry["sources"] = sorted(set(entry["sources"]))
|
||
return by_ac
|
||
|
||
def emit_traceability_status(self) -> Path:
|
||
path = self.evidence_dir / "traceability-status.json"
|
||
path.write_text(
|
||
json.dumps(self.compute_traceability_status(), sort_keys=True, indent=2)
|
||
+ "\n"
|
||
)
|
||
return path
|
||
|
||
def emit_regression_baseline(self) -> Path:
|
||
"""Flat dump of every numeric metric for diff tooling.
|
||
|
||
Kept intentionally flat (``{metric_name: numeric_value}``) so
|
||
regression-detection scripts can diff two baselines via a
|
||
simple dict-walk. The AZ-446 ``band`` / ``ci95_low`` /
|
||
``ci95_high`` annotations live in ``report.csv`` and the per-NFR
|
||
JSON instead — they're documentation about the metric, not
|
||
independently diffable measurements.
|
||
"""
|
||
path = self.evidence_dir / "regression-baseline.json"
|
||
blob = {
|
||
"scenarios": {
|
||
rec.scenario_id: {
|
||
"metrics": {
|
||
name: entry["value"]
|
||
for name, entry in rec.metrics.items()
|
||
if isinstance(entry["value"], (int, float))
|
||
},
|
||
"outcome": rec.outcome or "UNKNOWN",
|
||
}
|
||
for rec in self._records.values()
|
||
}
|
||
}
|
||
path.write_text(json.dumps(blob, sort_keys=True, indent=2) + "\n")
|
||
return path
|
||
|
||
def emit_per_metric_report(self, path: Path | None = None) -> Path:
|
||
"""AZ-446 — flat per-metric report (one row per scenario × metric).
|
||
|
||
Default path: ``<evidence_dir>/report.csv``. Columns:
|
||
|
||
scenario_id, metric_name, value, value_band,
|
||
ci95_low, ci95_high, ac_id, outcome
|
||
|
||
Non-numeric metric values are still emitted (cast to ``str``)
|
||
so the file captures every captured signal; downstream tooling
|
||
filters by ``value_band`` / ``ci95_low`` to decide what to
|
||
treat as numeric. Rows are sorted by ``(scenario_id,
|
||
metric_name)`` for deterministic diffing across runs.
|
||
"""
|
||
target = path if path is not None else self.evidence_dir / "report.csv"
|
||
target.parent.mkdir(parents=True, exist_ok=True)
|
||
rows: list[tuple[str, str, str, str, str, str, str, str]] = []
|
||
for rec in self._records.values():
|
||
for name, entry in rec.metrics.items():
|
||
rows.append(
|
||
(
|
||
rec.scenario_id,
|
||
name,
|
||
_stringify(entry.get("value")),
|
||
_stringify(entry.get("band")),
|
||
_stringify(entry.get("ci95_low")),
|
||
_stringify(entry.get("ci95_high")),
|
||
_stringify(entry.get("ac_id")),
|
||
rec.outcome or "UNKNOWN",
|
||
)
|
||
)
|
||
rows.sort(key=lambda r: (r[0], r[1]))
|
||
with target.open("w", newline="") as fh:
|
||
writer = csv.writer(fh)
|
||
writer.writerow(
|
||
[
|
||
"scenario_id",
|
||
"metric_name",
|
||
"value",
|
||
"value_band",
|
||
"ci95_low",
|
||
"ci95_high",
|
||
"ac_id",
|
||
"outcome",
|
||
]
|
||
)
|
||
writer.writerows(rows)
|
||
return target
|
||
|
||
|
||
# ───────────────────── pytest plugin glue ─────────────────────
|
||
|
||
|
||
_AGGREGATOR_KEY = pytest.StashKey["_RunAggregator | None"]()
|
||
|
||
|
||
def pytest_addoption(parser: pytest.Parser) -> None:
|
||
group = parser.getgroup("e2e-runner")
|
||
group.addoption(
|
||
"--traceability-matrix",
|
||
action="store",
|
||
default=None,
|
||
help=(
|
||
"Path to traceability-matrix.md (default: "
|
||
"_docs/02_document/tests/traceability-matrix.md relative to repo root). "
|
||
"Used to seed the NOT COVERED rows in traceability-status.json."
|
||
),
|
||
)
|
||
|
||
|
||
def _resolve_matrix_path(config: pytest.Config) -> Path:
|
||
opt = config.getoption("--traceability-matrix")
|
||
if opt:
|
||
return Path(opt)
|
||
return Path(__file__).resolve().parents[3] / "_docs" / "02_document" / "tests" / "traceability-matrix.md"
|
||
|
||
|
||
def pytest_configure(config: pytest.Config) -> None:
|
||
"""Parse the traceability matrix and create the aggregator.
|
||
|
||
`--evidence-out` is owned by the runner's conftest.py; by the time
|
||
this hook fires, that option is registered and available. The
|
||
aggregator's emission directory is therefore known up front.
|
||
"""
|
||
config.stash[_AGGREGATOR_KEY] = None
|
||
matrix_path = _resolve_matrix_path(config)
|
||
try:
|
||
matrix_ids = parse_traceability_matrix(matrix_path)
|
||
except FileNotFoundError:
|
||
logger.warning(
|
||
"traceability matrix not found at %s — NOT COVERED rows will be empty",
|
||
matrix_path,
|
||
)
|
||
matrix_ids = []
|
||
|
||
try:
|
||
evidence_out = config.getoption("--evidence-out")
|
||
except ValueError:
|
||
# `--evidence-out` is registered by the runner's conftest. In
|
||
# unit-test contexts where that conftest isn't loaded, default
|
||
# to the cwd so the aggregator still emits something — the unit
|
||
# test redirects this via direct construction of `_RunAggregator`.
|
||
evidence_out = "."
|
||
|
||
aggregator = _RunAggregator(Path(evidence_out), matrix_ids)
|
||
config.stash[_AGGREGATOR_KEY] = aggregator
|
||
config.pluginmanager.register(_PluginHooks(aggregator), name="e2e-nfr-recorder")
|
||
|
||
config.addinivalue_line(
|
||
"markers", "scenario_id(name): explicit NFT scenario id for the per-NFR JSON"
|
||
)
|
||
|
||
|
||
class _PluginHooks:
|
||
"""Tiny plugin instance that owns the logreport+sessionfinish hooks."""
|
||
|
||
def __init__(self, aggregator: _RunAggregator) -> None:
|
||
self._agg = aggregator
|
||
|
||
def pytest_runtest_logreport(self, report: pytest.TestReport) -> None:
|
||
if report.when != "call":
|
||
return
|
||
outcome_map = {
|
||
"passed": "PASS",
|
||
"failed": "FAIL",
|
||
"skipped": "SKIP",
|
||
}
|
||
self._agg.set_outcome(report.nodeid, outcome_map.get(report.outcome, "UNKNOWN"))
|
||
|
||
def pytest_sessionfinish(self, session: pytest.Session, exitstatus: int) -> None: # noqa: ARG002
|
||
self._agg.emit_per_nfr_json()
|
||
self._agg.emit_traceability_status()
|
||
self._agg.emit_regression_baseline()
|
||
self._agg.emit_per_metric_report()
|
||
|
||
|
||
def _scenario_id_for(item: pytest.Item) -> str:
|
||
marker = item.get_closest_marker("scenario_id")
|
||
if marker and marker.args:
|
||
return str(marker.args[0])
|
||
# Fall back to the test_id marker (compat with csv_reporter) or
|
||
# finally the nodeid.
|
||
test_id = item.get_closest_marker("test_id")
|
||
if test_id and test_id.args:
|
||
return str(test_id.args[0])
|
||
return item.nodeid
|
||
|
||
|
||
def _traces_to_for(item: pytest.Item) -> tuple[str, ...]:
|
||
marker = item.get_closest_marker("traces_to")
|
||
if marker is None:
|
||
return ()
|
||
ids = marker.args[0] if marker.args else marker.kwargs.get("ids", ())
|
||
if isinstance(ids, str):
|
||
return tuple(s.strip() for s in ids.split(",") if s.strip())
|
||
return tuple(ids)
|
||
|
||
|
||
@pytest.fixture
|
||
def nfr_recorder(request: pytest.FixtureRequest) -> _NfrRecorder:
|
||
"""Fixture handle for NFT scenarios to record metrics + partials."""
|
||
aggregator = request.config.stash.get(_AGGREGATOR_KEY, None)
|
||
if aggregator is None:
|
||
pytest.skip(
|
||
"nfr_recorder requires --evidence-out (the bundler's option) "
|
||
"to be set; the harness configures it at runtime."
|
||
)
|
||
scenario_id = _scenario_id_for(request.node)
|
||
traces_to = _traces_to_for(request.node)
|
||
rec = aggregator.ensure_record(scenario_id, request.node.nodeid, traces_to)
|
||
return _NfrRecorder(
|
||
scenario_id=rec.scenario_id,
|
||
nodeid=rec.nodeid,
|
||
traces_to=rec.traces_to,
|
||
run=aggregator,
|
||
)
|
||
|
||
|
||
# ───────────────────── public accessors for cross-plugin use ─────────────────────
|
||
|
||
|
||
def aggregator_for(config: pytest.Config) -> _RunAggregator | None:
|
||
"""Used by csv_reporter to propagate PARTIAL into the row's result column."""
|
||
return config.stash.get(_AGGREGATOR_KEY, None)
|