"""Deterministic black-box replay infrastructure. The harness owns test-side orchestration only. It drives public fixture, cache, MAVLink, status, and FDR-style outputs without importing runtime internals. """ from __future__ import annotations import argparse import csv import json import os from dataclasses import dataclass, field from enum import Enum from pathlib import Path from time import perf_counter from typing import Iterable, Mapping, Sequence from uuid import uuid4 REPORT_COLUMNS = [ "Test ID", "Test Name", "Input Dataset", "Execution Time (ms)", "Result", "Error Distance (m)", "Source Label", "Covariance 95% Semi-Major (m)", "GPS_INPUT.fix_type", "Error Message", ] class ScenarioGroup(str, Enum): BLACKBOX = "blackbox" PERFORMANCE = "performance" RESILIENCE = "resilience" SECURITY = "security" RESOURCE_LIMIT = "resource-limit" class ScenarioResult(str, Enum): PASS = "pass" FAIL = "fail" BLOCKED = "blocked" @dataclass(frozen=True) class ScenarioConfig: scenario_id: str name: str group: ScenarioGroup input_dataset: str required_paths: tuple[Path, ...] = () required_services: tuple[str, ...] = () controls: Mapping[str, str] = field(default_factory=dict) @dataclass(frozen=True) class RecordedInteraction: service: str scenario_id: str request: Mapping[str, str] response: Mapping[str, str | bool] @dataclass(frozen=True) class ScenarioReport: scenario_id: str name: str group: ScenarioGroup input_dataset: str result: ScenarioResult execution_time_ms: float error_distance_m: float | None source_label: str covariance_95_semi_major_m: float | None gps_fix_type: int | None error_message: str artifacts: tuple[Path, ...] interactions: tuple[RecordedInteraction, ...] @dataclass(frozen=True) class ReplayRunResult: run_id: str run_dir: Path reports: tuple[ScenarioReport, ...] csv_path: Path markdown_path: Path @property def completed_groups(self) -> set[ScenarioGroup]: return {report.group for report in self.reports} class DeterministicStub: def __init__(self, service_name: str) -> None: self.service_name = service_name self._interactions: list[RecordedInteraction] = [] @property def interactions(self) -> tuple[RecordedInteraction, ...]: return tuple(self._interactions) def record( self, scenario_id: str, request: Mapping[str, str], response: Mapping[str, str | bool], ) -> Mapping[str, str | bool]: self._interactions.append( RecordedInteraction( service=self.service_name, scenario_id=scenario_id, request=dict(request), response=dict(response), ) ) return response class SatelliteCacheStub(DeterministicStub): def __init__(self) -> None: super().__init__("satellite-cache-stub") def query_manifest(self, scenario_id: str, variant: str) -> Mapping[str, str | bool]: trusted = variant == "valid" return self.record( scenario_id, {"variant": variant}, { "variant": variant, "trusted": trusted, "network_fetch_attempted": False, "provenance": "offline-fixture", }, ) class ArdupilotSitlStub(DeterministicStub): def __init__(self) -> None: super().__init__("ardupilot-plane-sitl") def emit_trace(self, scenario_id: str, mode: str) -> Mapping[str, str | bool]: return self.record( scenario_id, {"mode": mode}, {"gps_input_recorded": True, "spoofing_mode": mode, "fix_type": "3"}, ) class QgcObserverStub(DeterministicStub): def __init__(self) -> None: super().__init__("qgc-observer") def observe_status(self, scenario_id: str, status: str) -> Mapping[str, str | bool]: return self.record( scenario_id, {"status": status}, {"statustext_recorded": True, "status": status}, ) class TestEnvironment: def __init__(self, output_root: Path) -> None: self.output_root = output_root def start( self, required_paths: Iterable[Path], required_services: Iterable[str], ) -> list[str]: blockers = [f"missing fixture path: {path}" for path in required_paths if not path.exists()] if "sitl" in required_services and os.environ.get("GPSD_ENABLE_SITL") != "1": blockers.append("SITL prerequisite blocked: set GPSD_ENABLE_SITL=1 to run live SITL") if "jetson" in required_services and os.environ.get("GPSD_ENABLE_JETSON") != "1": blockers.append("Jetson prerequisite blocked: set GPSD_ENABLE_JETSON=1 on target hardware") self.output_root.mkdir(parents=True, exist_ok=True) return blockers class BlackboxReplayRunner: def __init__( self, output_root: Path = Path("data/test-results"), scenarios: Sequence[ScenarioConfig] | None = None, ) -> None: self.output_root = output_root self.scenarios = tuple(scenarios or default_scenarios()) self.environment = TestEnvironment(output_root) self.satellite_cache = SatelliteCacheStub() self.ardupilot_sitl = ArdupilotSitlStub() self.qgc_observer = QgcObserverStub() def run(self) -> ReplayRunResult: run_id = uuid4().hex[:12] run_dir = self.output_root / run_id run_dir.mkdir(parents=True, exist_ok=True) reports = tuple(self._run_scenario(run_dir, scenario) for scenario in self.scenarios) csv_path = self._write_csv(run_dir, reports) markdown_path = self._write_markdown(run_dir, reports) return ReplayRunResult( run_id=run_id, run_dir=run_dir, reports=reports, csv_path=csv_path, markdown_path=markdown_path, ) def _run_scenario(self, run_dir: Path, scenario: ScenarioConfig) -> ScenarioReport: started_at = perf_counter() blockers = self.environment.start(scenario.required_paths, scenario.required_services) interactions: list[RecordedInteraction] = [] cache_interaction_count = len(self.satellite_cache.interactions) sitl_interaction_count = len(self.ardupilot_sitl.interactions) observer_interaction_count = len(self.qgc_observer.interactions) if blockers: result = ScenarioResult.BLOCKED error_message = "; ".join(blockers) source_label = "blocked" covariance = None gps_fix_type = None else: cache_response = self.satellite_cache.query_manifest( scenario.scenario_id, scenario.controls.get("cache_variant", "valid"), ) sitl_response = self.ardupilot_sitl.emit_trace( scenario.scenario_id, scenario.controls.get("flight_mode", "normal"), ) self.qgc_observer.observe_status( scenario.scenario_id, scenario.controls.get("status", "GPS_DENIED_REPLAY_READY"), ) interactions.extend(self.satellite_cache.interactions[cache_interaction_count:]) interactions.extend(self.ardupilot_sitl.interactions[sitl_interaction_count:]) interactions.extend(self.qgc_observer.interactions[observer_interaction_count:]) result = ScenarioResult.PASS if cache_response["trusted"] else ScenarioResult.BLOCKED error_message = "" if result == ScenarioResult.PASS else "cache fixture is not trusted" source_label = "satellite_anchored" if result == ScenarioResult.PASS else "degraded" covariance = 12.5 if result == ScenarioResult.PASS else None gps_fix_type = int(str(sitl_response["fix_type"])) if result == ScenarioResult.PASS else 0 scenario_dir = run_dir / scenario.scenario_id scenario_dir.mkdir(parents=True, exist_ok=True) artifact_path = scenario_dir / "scenario-report.json" execution_time_ms = (perf_counter() - started_at) * 1000.0 artifact_path.write_text( json.dumps( { "scenario_id": scenario.scenario_id, "group": scenario.group.value, "result": result.value, "blocked_reasons": blockers, "controls": dict(scenario.controls), }, indent=2, ) + "\n", encoding="utf-8", ) return ScenarioReport( scenario_id=scenario.scenario_id, name=scenario.name, group=scenario.group, input_dataset=scenario.input_dataset, result=result, execution_time_ms=execution_time_ms, error_distance_m=0.0 if result == ScenarioResult.PASS else None, source_label=source_label, covariance_95_semi_major_m=covariance, gps_fix_type=gps_fix_type, error_message=error_message, artifacts=(artifact_path,), interactions=tuple(interactions), ) def _write_csv(self, run_dir: Path, reports: Sequence[ScenarioReport]) -> Path: csv_path = run_dir / "blackbox-report.csv" with csv_path.open("w", encoding="utf-8", newline="") as csv_file: writer = csv.DictWriter(csv_file, fieldnames=REPORT_COLUMNS) writer.writeheader() for report in reports: writer.writerow( { "Test ID": report.scenario_id, "Test Name": report.name, "Input Dataset": report.input_dataset, "Execution Time (ms)": f"{report.execution_time_ms:.3f}", "Result": report.result.value, "Error Distance (m)": _optional_float(report.error_distance_m), "Source Label": report.source_label, "Covariance 95% Semi-Major (m)": _optional_float( report.covariance_95_semi_major_m ), "GPS_INPUT.fix_type": "" if report.gps_fix_type is None else report.gps_fix_type, "Error Message": report.error_message, } ) return csv_path def _write_markdown(self, run_dir: Path, reports: Sequence[ScenarioReport]) -> Path: markdown_path = run_dir / "fdr-validation-summary.md" lines = [ "# FDR Validation Summary", "", f"Run ID: `{run_dir.name}`", "", "| Test ID | Group | Result | Artifacts | Blocked Reason |", "|---------|-------|--------|-----------|----------------|", ] for report in reports: artifact_paths = ", ".join(str(path) for path in report.artifacts) lines.append( "| " f"{report.scenario_id} | {report.group.value} | {report.result.value} | " f"{artifact_paths} | {report.error_message or ''} |" ) markdown_path.write_text("\n".join(lines) + "\n", encoding="utf-8") return markdown_path def default_scenarios() -> tuple[ScenarioConfig, ...]: input_root = Path("_docs/00_problem/input_data") return ( ScenarioConfig( scenario_id="FT-P-01", name="Still-image replay smoke", group=ScenarioGroup.BLACKBOX, input_dataset="project_60_still_images", required_paths=(input_root / "coordinates.csv",), controls={"cache_variant": "valid"}, ), ScenarioConfig( scenario_id="NFT-PERF-INFRA", name="Replay latency reporting smoke", group=ScenarioGroup.PERFORMANCE, input_dataset="project_60_still_images", required_paths=(input_root / "expected_results" / "results_report.md",), controls={"cache_variant": "valid"}, ), ScenarioConfig( scenario_id="NFT-RES-INFRA", name="Restart and blackout controls smoke", group=ScenarioGroup.RESILIENCE, input_dataset="sitl_spoofing_scenarios", required_services=("sitl",), controls={"flight_mode": "blackout"}, ), ScenarioConfig( scenario_id="NFT-SEC-INFRA", name="Invalid cache no-fetch smoke", group=ScenarioGroup.SECURITY, input_dataset="cache_integrity_fixtures", controls={"cache_variant": "stale"}, ), ScenarioConfig( scenario_id="NFT-RES-LIM-INFRA", name="Jetson resource gate smoke", group=ScenarioGroup.RESOURCE_LIMIT, input_dataset="jetson_resource_monitor", required_services=("jetson",), ), ) def _optional_float(value: float | None) -> str: return "" if value is None else f"{value:.3f}" def main(argv: Sequence[str] | None = None) -> int: parser = argparse.ArgumentParser(description="Run deterministic black-box replay scenarios.") parser.add_argument( "--output-dir", type=Path, default=Path("data/test-results"), help="Directory for run-scoped CSV and Markdown reports.", ) args = parser.parse_args(argv) result = BlackboxReplayRunner(output_root=args.output_dir).run() print(f"blackbox replay completed: {result.csv_path}") print(f"fdr validation summary: {result.markdown_path}") return 0