[AZ-446] CSV reporter: band + ci95 annotations + report.csv emitter

Batch 89 — adds optional `band`, `ci95_low`, `ci95_high` kw-only
parameters to `_NfrRecorder.record_metric` and emits a new per-metric
report.csv artifact (one row per scenario × metric, columns:
scenario_id, metric_name, value, value_band, ci95_low, ci95_high,
ac_id, outcome). Backwards compatible — existing 4-arg callers
unchanged; unbalanced ci95 pair raises ValueError. report.csv is
written once per pytest session from `pytest_sessionfinish` so the
annotation pass runs once per CI invocation regardless of
(fc_adapter, vio_strategy) (AC-3). `regression-baseline.json`
intentionally kept flat to preserve the diff contract used by
regression-detection tooling.

NFT-RES-03 + NFT-PERF-01 scenarios updated to pass real bands and
compute empirical 2.5/97.5-percentile ci95 from their own sample
streams (per-iteration envelope ratios for Monte Carlo,
per-frame latency samples for N-sample latency).

Tests: 1229 e2e/_unit_tests pass (+6 vs. batch 88 for AZ-446
band/CI behavior, value-error on unbalanced ci95, report.csv columns,
explicit-path override, and end-to-end emission via the pytest
plugin). Code review: PASS_WITH_WARNINGS — 1 Low (empirical-CI
semantics, documented inline), 1 Medium carried over from batch 88's
cumulative-review backlog (write_csv_evidence + _resolve_fixture_path
duplication is outside AZ-446 reporting scope).

This commit closes Step 10 Implement Tests for cycle 1 (41 of 41
blackbox-test tasks done, AZ-406..AZ-446). Greenfield auto-chains to
Step 11 Run Tests next.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-17 18:14:00 +03:00
parent 6e4a575221
commit 33e683dc0f
8 changed files with 595 additions and 13 deletions
+110 -4
View File
@@ -27,6 +27,7 @@ calling ``partial`` are recorded as Covered.
from __future__ import annotations
import csv
import json
import logging
import re
@@ -38,6 +39,15 @@ import pytest
from .csv_reporter import reporter_for
def _stringify(value: Any) -> str:
"""CSV cell projection — ``None`` → empty cell; floats keep precision."""
if value is None:
return ""
if isinstance(value, float):
return f"{value:.6g}"
return str(value)
logger = logging.getLogger(__name__)
@@ -99,16 +109,44 @@ class _NfrRecorder:
self.traces_to = traces_to
self._run = run
def record_metric(self, name: str, value: Any, ac_id: str | None = None) -> None:
"""Capture a numeric / structured metric for this scenario."""
def record_metric(
self,
name: str,
value: Any,
ac_id: str | None = None,
*,
band: str | None = None,
ci95_low: float | None = None,
ci95_high: float | None = None,
) -> None:
"""Capture a numeric / structured metric for this scenario.
Optional kwargs (AZ-446):
* ``band`` — short human-readable AC threshold text (e.g.
``"≤400 ms"``). Surfaces as ``<name>_band`` in the per-metric
report.csv and as ``"band"`` in regression-baseline.json.
* ``ci95_low`` / ``ci95_high`` — 95% interval bounds for the
metric, used by Monte Carlo (NFT-RES-03) and N-sample
(NFT-PERF-01) scenarios. Both must be passed together or
both omitted; passing only one raises ``ValueError``.
"""
if not isinstance(name, str) or not name:
raise ValueError(f"metric name must be a non-empty str, got {name!r}")
if (ci95_low is None) != (ci95_high is None):
raise ValueError(
f"ci95_low and ci95_high must be provided together "
f"(got low={ci95_low!r}, high={ci95_high!r})"
)
self._run.record_metric(
scenario_id=self.scenario_id,
name=name,
value=value,
ac_id=ac_id,
nodeid=self.nodeid,
band=band,
ci95_low=ci95_low,
ci95_high=ci95_high,
)
def partial(self, ac_id: str, reason: str) -> None:
@@ -161,9 +199,19 @@ class _RunAggregator:
value: Any,
ac_id: str | None,
nodeid: str,
band: str | None = None,
ci95_low: float | None = None,
ci95_high: float | None = None,
) -> None:
rec = self._records[nodeid]
rec.metrics[name] = {"value": value, "ac_id": ac_id}
entry: dict[str, Any] = {"value": value, "ac_id": ac_id}
if band is not None:
entry["band"] = band
if ci95_low is not None:
entry["ci95_low"] = ci95_low
if ci95_high is not None:
entry["ci95_high"] = ci95_high
rec.metrics[name] = entry
def mark_partial(
self,
@@ -254,7 +302,15 @@ class _RunAggregator:
return path
def emit_regression_baseline(self) -> Path:
"""Flat dump of every numeric metric for diff tooling."""
"""Flat dump of every numeric metric for diff tooling.
Kept intentionally flat (``{metric_name: numeric_value}``) so
regression-detection scripts can diff two baselines via a
simple dict-walk. The AZ-446 ``band`` / ``ci95_low`` /
``ci95_high`` annotations live in ``report.csv`` and the per-NFR
JSON instead — they're documentation about the metric, not
independently diffable measurements.
"""
path = self.evidence_dir / "regression-baseline.json"
blob = {
"scenarios": {
@@ -272,6 +328,55 @@ class _RunAggregator:
path.write_text(json.dumps(blob, sort_keys=True, indent=2) + "\n")
return path
def emit_per_metric_report(self, path: Path | None = None) -> Path:
"""AZ-446 — flat per-metric report (one row per scenario × metric).
Default path: ``<evidence_dir>/report.csv``. Columns:
scenario_id, metric_name, value, value_band,
ci95_low, ci95_high, ac_id, outcome
Non-numeric metric values are still emitted (cast to ``str``)
so the file captures every captured signal; downstream tooling
filters by ``value_band`` / ``ci95_low`` to decide what to
treat as numeric. Rows are sorted by ``(scenario_id,
metric_name)`` for deterministic diffing across runs.
"""
target = path if path is not None else self.evidence_dir / "report.csv"
target.parent.mkdir(parents=True, exist_ok=True)
rows: list[tuple[str, str, str, str, str, str, str, str]] = []
for rec in self._records.values():
for name, entry in rec.metrics.items():
rows.append(
(
rec.scenario_id,
name,
_stringify(entry.get("value")),
_stringify(entry.get("band")),
_stringify(entry.get("ci95_low")),
_stringify(entry.get("ci95_high")),
_stringify(entry.get("ac_id")),
rec.outcome or "UNKNOWN",
)
)
rows.sort(key=lambda r: (r[0], r[1]))
with target.open("w", newline="") as fh:
writer = csv.writer(fh)
writer.writerow(
[
"scenario_id",
"metric_name",
"value",
"value_band",
"ci95_low",
"ci95_high",
"ac_id",
"outcome",
]
)
writer.writerows(rows)
return target
# ───────────────────── pytest plugin glue ─────────────────────
@@ -356,6 +461,7 @@ class _PluginHooks:
self._agg.emit_per_nfr_json()
self._agg.emit_traceability_status()
self._agg.emit_regression_baseline()
self._agg.emit_per_metric_report()
def _scenario_id_for(item: pytest.Item) -> str: