Files
gps-denied-onboard/tests/e2e/replay/_report_writer.py
T
Oleksandr Bezdieniezhnykh dcde602f61 [AZ-699] Real-flight validation runner + Markdown accuracy report
New e2e test runs gps-denied-replay --auto-trim against the real
derkachi.tlog + flight video + AZ-702 calibration, computes the
horizontal-error distribution (mean/p50/p95/p99 + 10/25/50/100 m
threshold-hit share), writes _docs/06_metrics/real_flight_
validation_{date}.md, and asserts honest PASS/FAIL with no @xfail
mask. AZ-404's 1-min test is untouched (sibling, not replacement).

Extends gps_compare.py with HorizontalErrorDistribution +
percentile_sorted (numpy-equivalent linear interpolation). New
test helper _report_writer.py renders the canonical Markdown
schema documented as FT-P-20 in blackbox-tests.md.

16 new unit tests pin distribution arithmetic, verdict gate,
failure-message templating (references calibration acquisition
method per AC-3), and report layout. 129 passed in focused
regression, 3 skipped (real video / Tier-2 prerequisites).
Zero new mypy --strict errors.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-20 16:53:48 +03:00

189 lines
6.8 KiB
Python

"""AZ-699 Markdown accuracy-report writer (test helper).
Renders a :class:`HorizontalErrorDistribution` (the production
helper in ``gps_denied_onboard.helpers.gps_compare``) plus run
context (calibration acquisition method, clip duration, fixture
paths) into the canonical Markdown layout consumed by
``_docs/06_metrics/real_flight_validation_{date}.md``.
This module lives under ``tests/`` (NOT production) — the report
is an artefact of running the AZ-699 e2e test. Promoting the
writer to ``src/`` would invite production code to import a test
helper, so the file ownership rule keeps it here.
Style: every function is pure; the side effect (writing the file)
is the caller's. Tests in ``tests/unit/test_az699_report_writer.py``
exercise both the rendering and the threshold-gate verdict logic.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from gps_denied_onboard.helpers.gps_compare import HorizontalErrorDistribution
__all__ = [
"AC3_GATE_PCT",
"AC3_GATE_THRESHOLD_M",
"ReportContext",
"format_failure_message",
"render_report",
"verdict_passes_ac3",
]
# AZ-696 epic AC-3 threshold + minimum-share gate. Keeping these
# named constants here (rather than inlined into the test) so the
# unit tests for the failure-message template can pin them.
AC3_GATE_THRESHOLD_M: float = 100.0
AC3_GATE_PCT: float = 80.0
@dataclass(frozen=True)
class ReportContext:
"""Run context surfaced in the report header.
Attributes:
run_date_utc: ISO-8601 date (YYYY-MM-DD) at which the run
executed — drives the report filename.
tlog_path: Real tlog the runner consumed.
video_path: Video clip the runner consumed.
calibration_acquisition_method: Provenance of the camera
calibration (e.g. ``"factory-sheet"`` for AZ-702 or
``"placeholder"`` for the adti26 fallback). Surfaced in
the failure message per AZ-699 AC-3.
clip_duration_s: Duration of the analysed clip in seconds.
emissions_count: Total estimator-output records consumed
from the JSONL (may differ from
``distribution.count`` when some emissions land
outside the GT window).
"""
run_date_utc: str
tlog_path: Path
video_path: Path
calibration_acquisition_method: str
clip_duration_s: float
emissions_count: int
def verdict_passes_ac3(distribution: HorizontalErrorDistribution) -> bool:
"""Return ``True`` when the run meets AZ-696 epic AC-3."""
if distribution.count == 0:
return False
share = distribution.threshold_hit_share.get(AC3_GATE_THRESHOLD_M)
if share is None:
return False
return share * 100.0 >= AC3_GATE_PCT
def format_failure_message(
distribution: HorizontalErrorDistribution,
context: ReportContext,
) -> str:
"""Build the honest failure message for AZ-699 AC-3.
The message references the calibration acquisition method
(factory-sheet for AZ-702 or placeholder otherwise) and the
measured residual budget, so the operator can attribute a
failure to its likely root cause (calibration uncertainty,
drift, anchor scarcity) without re-reading the source.
"""
share = distribution.threshold_hit_share.get(AC3_GATE_THRESHOLD_M, 0.0)
pct = share * 100.0
return (
f"AZ-699 AC-3: only {pct:.1f} % of {distribution.count} "
f"emissions within {AC3_GATE_THRESHOLD_M:.0f} m of ground "
f"truth; epic threshold is {AC3_GATE_PCT:.0f} %. "
f"Residual: mean={distribution.horizontal_error_mean_m:.1f} m, "
f"p50={distribution.horizontal_error_p50_m:.1f} m, "
f"p95={distribution.horizontal_error_p95_m:.1f} m, "
f"p99={distribution.horizontal_error_p99_m:.1f} m. "
f"Calibration: {context.calibration_acquisition_method}. "
"See _docs/06_metrics/real_flight_validation_"
f"{context.run_date_utc}.md for the full distribution."
)
def render_report(
distribution: HorizontalErrorDistribution,
context: ReportContext,
*,
passed: bool,
) -> str:
"""Render the full Markdown report body.
The output layout (header + horizontal-error stats + threshold
table + vertical-error stats + verdict) is the schema referenced
by ``_docs/02_document/tests/blackbox-tests.md``.
"""
verdict = "PASS" if passed else "FAIL"
horiz_rows = [
("Mean", distribution.horizontal_error_mean_m),
("p50", distribution.horizontal_error_p50_m),
("p95", distribution.horizontal_error_p95_m),
("p99", distribution.horizontal_error_p99_m),
]
threshold_rows = [
(t, share)
for t, share in sorted(distribution.threshold_hit_share.items())
]
lines: list[str] = []
lines.append(f"# Real-flight validation — {context.run_date_utc}")
lines.append("")
lines.append(f"**Verdict**: {verdict} (AC-3 gate: "
f"{AC3_GATE_PCT:.0f} % within "
f"{AC3_GATE_THRESHOLD_M:.0f} m)")
lines.append("")
lines.append("## Run context")
lines.append("")
lines.append(f"- Tlog: `{context.tlog_path}`")
lines.append(f"- Video: `{context.video_path}`")
lines.append(
f"- Calibration acquisition method: {context.calibration_acquisition_method}"
)
lines.append(f"- Clip duration: {context.clip_duration_s:.1f} s")
lines.append(f"- Emissions consumed: {context.emissions_count}")
lines.append(f"- Ground-truth pairings: {distribution.count}")
lines.append("")
lines.append("## Horizontal error (metres)")
lines.append("")
lines.append("| Statistic | Value |")
lines.append("| --------- | ----- |")
for name, value in horiz_rows:
lines.append(f"| {name} | {value:.2f} |")
lines.append("")
lines.append("## Threshold-hit share")
lines.append("")
lines.append("| Threshold (m) | Hit share (%) |")
lines.append("| ------------- | ------------- |")
for threshold, share in threshold_rows:
lines.append(f"| {threshold:g} | {share * 100.0:.1f} |")
lines.append("")
if distribution.vertical_count > 0:
lines.append("## Vertical error (metres)")
lines.append("")
lines.append("| Statistic | Value |")
lines.append("| --------- | ----- |")
lines.append(
f"| Mean | {distribution.vertical_error_mean_m:.2f} |"
)
lines.append(
f"| p50 | {distribution.vertical_error_p50_m:.2f} |"
)
lines.append(
f"| p95 | {distribution.vertical_error_p95_m:.2f} |"
)
lines.append(
f"| Samples | {distribution.vertical_count} |"
)
lines.append("")
else:
lines.append("## Vertical error")
lines.append("")
lines.append("_No emissions carried a comparable altitude — vertical stats skipped._")
lines.append("")
return "\n".join(lines) + "\n"