[AZ-238] [AZ-239] Add resource restart tests

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-05 06:26:15 +03:00
parent 5acd14b792
commit 2ba44a33c5
8 changed files with 274 additions and 3 deletions
+71
View File
@@ -0,0 +1,71 @@
from pathlib import Path
from e2e.replay.harness import (
BlackboxReplayRunner,
ScenarioConfig,
ScenarioGroup,
ScenarioResult,
relocalization_required,
summarize_cold_start_trials,
)
def test_disconnected_segment_triggers_relocalization_request_check() -> None:
# Act / Assert
assert relocalization_required(visual_overlap_fraction=0.03, disconnected_duration_s=0.5) is True
assert relocalization_required(visual_overlap_fraction=0.5, disconnected_duration_s=4.0) is True
assert relocalization_required(visual_overlap_fraction=0.5, disconnected_duration_s=1.0) is False
def test_restart_scenario_records_first_output_or_blocked_prerequisite(tmp_path: Path) -> None:
# Arrange
scenario = ScenarioConfig(
scenario_id="NFT-RES-03",
name="Companion restart recovery",
group=ScenarioGroup.RESILIENCE,
input_dataset="restart_trace",
required_paths=(tmp_path / "restart-trace.tlog",),
)
# Act
result = BlackboxReplayRunner(output_root=tmp_path, scenarios=(scenario,)).run()
# Assert
report = result.reports[0]
assert report.result == ScenarioResult.BLOCKED
assert "restart-trace.tlog" in report.error_message
assert report.artifacts[0].exists()
def test_cold_start_trials_report_p95_first_fix_and_resource_spike() -> None:
# Arrange
first_fix_latencies_s = tuple(20.0 + (index % 5) for index in range(50))
peak_memory_bytes = tuple(2_500_000_000 + index * 1_000_000 for index in range(50))
# Act
summary = summarize_cold_start_trials(first_fix_latencies_s, peak_memory_bytes)
# Assert
assert summary["trial_count"] == 50.0
assert summary["p95_first_fix_s"] < 30.0
assert summary["first_fix_passed"] is True
assert summary["memory_passed"] is True
def test_cold_start_hardware_prerequisites_are_blocked_not_passed(tmp_path: Path) -> None:
# Arrange
scenario = ScenarioConfig(
scenario_id="NFT-RES-LIM-05",
name="Cold-start resource spike",
group=ScenarioGroup.RESOURCE_LIMIT,
input_dataset="jetson_resource_monitor",
required_services=("jetson",),
)
# Act
result = BlackboxReplayRunner(output_root=tmp_path, scenarios=(scenario,)).run()
# Assert
report = result.reports[0]
assert report.result == ScenarioResult.BLOCKED
assert "Jetson prerequisite blocked" in report.error_message
+86
View File
@@ -0,0 +1,86 @@
from pathlib import Path
from e2e.replay.harness import BlackboxReplayRunner, ResourceSample, summarize_resource_samples
from fdr_observability import FdrExportRequest, FdrPayload, InMemoryFlightRecorder
from shared.contracts import FdrEvent
def test_jetson_resource_metric_summary_captures_memory_and_throttle_fields() -> None:
# Arrange
samples = (
ResourceSample(
timestamp_s=0.0,
process_rss_bytes=1_000_000_000,
shared_memory_used_bytes=2_000_000_000,
cuda_allocated_bytes=500_000_000,
throttle_active=False,
temperature_c=55.0,
),
ResourceSample(
timestamp_s=60.0,
process_rss_bytes=1_200_000_000,
shared_memory_used_bytes=2_300_000_000,
cuda_allocated_bytes=650_000_000,
throttle_active=False,
temperature_c=62.0,
),
)
# Act
summary = summarize_resource_samples(samples)
# Assert
assert summary["duration_s"] == 60.0
assert summary["peak_shared_memory_used_bytes"] == 2_300_000_000.0
assert summary["peak_cuda_allocated_bytes"] == 650_000_000.0
assert summary["throttle_observed"] is False
assert summary["max_temperature_c"] == 62.0
def test_missing_thermal_hardware_reports_blocked_prerequisite(tmp_path: Path) -> None:
# Act
result = BlackboxReplayRunner(output_root=tmp_path).run()
# Assert
resource_report = next(report for report in result.reports if report.group.value == "resource-limit")
assert resource_report.result.value == "blocked"
assert "Jetson prerequisite blocked" in resource_report.error_message
def test_fdr_rollover_logs_segments_without_raw_frame_retention() -> None:
# Arrange
recorder = InMemoryFlightRecorder(segment_limit_bytes=100, storage_limit_bytes=500)
# Act
first = recorder.append_event(
_event("estimate", 1, "fdr://payload/gps-input-1"),
FdrPayload(ref="fdr://payload/gps-input-1", size_bytes=60, redacted=True),
)
second = recorder.append_event(
_event("health", 2, "fdr://payload/health-1"),
FdrPayload(ref="fdr://payload/health-1", size_bytes=60, redacted=True),
)
export = recorder.export(
FdrExportRequest(mission_id="mission-001", run_id="run-001", include_analytics=True)
)
# Assert
assert first.appended is True
assert second.rollover is True
assert recorder.health.status == "ready"
assert export.produced is True
assert len(export.segments) == 2
assert all("raw-frame" not in segment.segment_id for segment in export.segments)
assert export.analytics_ref is not None
def _event(event_type: str, timestamp_ns: int, payload_ref: str) -> FdrEvent:
return FdrEvent(
event_type=event_type,
timestamp_ns=timestamp_ns,
component="blackbox_resource_test",
severity="info",
payload_ref=payload_ref,
mission_id="mission-001",
run_id="run-001",
)