start over again

This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-07 04:08:03 +03:00
parent ee6606a9c2
commit 8382cdae10
351 changed files with 0 additions and 30337 deletions
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-13
View File
@@ -1,13 +0,0 @@
"""Black-box runner entry point."""
from collections.abc import Sequence
from e2e.replay.harness import main as replay_main
def main(argv: Sequence[str] | None = None) -> int:
return replay_main(argv)
if __name__ == "__main__":
raise SystemExit(main())
-1
View File
@@ -1 +0,0 @@
@@ -1 +0,0 @@
-117
View File
@@ -1,117 +0,0 @@
from e2e.replay.harness import mavlink_source_is_authorized
from mavlink_gcs_integration import InMemoryMavlinkGateway, OperatorStatusMessage
from safety_anchor_wrapper import SafetyAnchorStateMachine, SafetyStateConfig, TelemetryContext
from shared.contracts import VioStatePacket
def test_blackout_trace_transitions_to_dead_reckoned_then_no_fix() -> None:
# Arrange
state_machine = SafetyAnchorStateMachine(
SafetyStateConfig(
initial_covariance_m=2.0,
dead_reckoning_growth_m=125.0,
no_fix_covariance_threshold_m=500.0,
)
)
state_machine.update_vio(
VioStatePacket(
timestamp_ns=1_000_000_000,
relative_pose={"x_m": 0.0},
velocity_mps=(0.0, 0.0, 0.0),
tracking_quality=0.9,
covariance_hint=[[2.0, 0.0], [0.0, 2.0]],
),
TelemetryContext(
timestamp_ns=1_000_000_000,
latitude_hint_deg=48.0,
longitude_hint_deg=37.0,
altitude_m=400.0,
),
)
# Act
snapshots = tuple(
state_machine.propagate_blackout(1_000_000_000 + index * 1_000_000_000)
for index in range(1, 6)
)
# Assert
assert snapshots[0].mode == "dead_reckoned"
assert snapshots[-1].mode == "no_fix"
covariances = tuple(snapshot.estimate.covariance_semimajor_m for snapshot in snapshots)
assert covariances == tuple(sorted(covariances))
assert snapshots[-1].estimate.fix_type == 0
assert snapshots[-1].estimate.horizontal_accuracy_m >= 999.0
def test_no_fix_estimate_is_not_emitted_as_confident_gps_input() -> None:
# Arrange
state_machine = SafetyAnchorStateMachine(
SafetyStateConfig(dead_reckoning_growth_m=600.0, no_fix_covariance_threshold_m=500.0)
)
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000_000_000)
state_machine.update_vio(
VioStatePacket(
timestamp_ns=1,
relative_pose={"x_m": 0.0},
velocity_mps=(0.0, 0.0, 0.0),
tracking_quality=0.5,
),
TelemetryContext(
timestamp_ns=1,
latitude_hint_deg=48.0,
longitude_hint_deg=37.0,
altitude_m=400.0,
),
)
no_fix_snapshot = state_machine.propagate_blackout(2)
# Act
emission = gateway.emit_gps_input(no_fix_snapshot.estimate)
# Assert
assert emission.emitted is False
assert emission.error is not None
assert "unsafe for GPS_INPUT" in emission.error.message
def test_unauthorized_mavlink_sources_are_rejected_by_test_assertion() -> None:
# Arrange
allowed_source_system_ids = {1, 42}
# Act / Assert
assert mavlink_source_is_authorized(42, allowed_source_system_ids) is True
assert mavlink_source_is_authorized(99, allowed_source_system_ids) is False
def test_qgc_status_and_fdr_evidence_are_visible_and_rate_limited() -> None:
# Arrange
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=2_000_000_000)
messages = [
OperatorStatusMessage(
timestamp_ns=1_000_000_000,
severity="warning",
text="VISUAL_BLACKOUT_IMU_ONLY",
),
OperatorStatusMessage(
timestamp_ns=2_000_000_000,
severity="warning",
text="VISUAL_BLACKOUT_IMU_ONLY",
),
OperatorStatusMessage(
timestamp_ns=4_000_000_000,
severity="critical",
text="VISUAL_BLACKOUT_FAILSAFE",
),
]
# Act
result = gateway.emit_status(messages)
# Assert
assert [message.text for message in result.emitted] == [
"VISUAL_BLACKOUT_IMU_ONLY",
"VISUAL_BLACKOUT_FAILSAFE",
]
assert len(result.suppressed) == 1
assert all(message.visible_to_qgc for message in result.emitted)
-71
View File
@@ -1,71 +0,0 @@
from pathlib import Path
from e2e.replay.harness import (
BlackboxReplayRunner,
ScenarioConfig,
ScenarioGroup,
ScenarioResult,
relocalization_required,
summarize_cold_start_trials,
)
def test_disconnected_segment_triggers_relocalization_request_check() -> None:
# Act / Assert
assert relocalization_required(visual_overlap_fraction=0.03, disconnected_duration_s=0.5) is True
assert relocalization_required(visual_overlap_fraction=0.5, disconnected_duration_s=4.0) is True
assert relocalization_required(visual_overlap_fraction=0.5, disconnected_duration_s=1.0) is False
def test_restart_scenario_records_first_output_or_blocked_prerequisite(tmp_path: Path) -> None:
# Arrange
scenario = ScenarioConfig(
scenario_id="NFT-RES-03",
name="Companion restart recovery",
group=ScenarioGroup.RESILIENCE,
input_dataset="restart_trace",
required_paths=(tmp_path / "restart-trace.tlog",),
)
# Act
result = BlackboxReplayRunner(output_root=tmp_path, scenarios=(scenario,)).run()
# Assert
report = result.reports[0]
assert report.result == ScenarioResult.BLOCKED
assert "restart-trace.tlog" in report.error_message
assert report.artifacts[0].exists()
def test_cold_start_trials_report_p95_first_fix_and_resource_spike() -> None:
# Arrange
first_fix_latencies_s = tuple(20.0 + (index % 5) for index in range(50))
peak_memory_bytes = tuple(2_500_000_000 + index * 1_000_000 for index in range(50))
# Act
summary = summarize_cold_start_trials(first_fix_latencies_s, peak_memory_bytes)
# Assert
assert summary["trial_count"] == 50.0
assert summary["p95_first_fix_s"] < 30.0
assert summary["first_fix_passed"] is True
assert summary["memory_passed"] is True
def test_cold_start_hardware_prerequisites_are_blocked_not_passed(tmp_path: Path) -> None:
# Arrange
scenario = ScenarioConfig(
scenario_id="NFT-RES-LIM-05",
name="Cold-start resource spike",
group=ScenarioGroup.RESOURCE_LIMIT,
input_dataset="jetson_resource_monitor",
required_services=("jetson",),
)
# Act
result = BlackboxReplayRunner(output_root=tmp_path, scenarios=(scenario,)).run()
# Assert
report = result.reports[0]
assert report.result == ScenarioResult.BLOCKED
assert "Jetson prerequisite blocked" in report.error_message
-96
View File
@@ -1,96 +0,0 @@
import csv
from pathlib import Path
from e2e.replay.harness import (
REPORT_COLUMNS,
BlackboxReplayRunner,
SatelliteCacheStub,
ScenarioConfig,
ScenarioGroup,
ScenarioResult,
)
def test_replay_environment_reports_missing_prerequisites_as_blocked(tmp_path: Path) -> None:
# Arrange
scenario = ScenarioConfig(
scenario_id="BLOCKED-INFRA",
name="Blocked prerequisite smoke",
group=ScenarioGroup.RESILIENCE,
input_dataset="sitl_spoofing_scenarios",
required_paths=(tmp_path / "missing-fixture.csv",),
required_services=("sitl",),
)
# Act
result = BlackboxReplayRunner(output_root=tmp_path, scenarios=(scenario,)).run()
# Assert
report = result.reports[0]
assert report.result == ScenarioResult.BLOCKED
assert "missing fixture path" in report.error_message
assert "SITL prerequisite blocked" in report.error_message
def test_satellite_cache_stub_is_deterministic_and_records_interactions() -> None:
# Arrange
stub = SatelliteCacheStub()
# Act
first = stub.query_manifest("FT-P-01", "valid")
second = stub.query_manifest("FT-P-01", "valid")
# Assert
assert first == second
assert first["network_fetch_attempted"] is False
assert len(stub.interactions) == 2
assert stub.interactions[0].service == "satellite-cache-stub"
def test_runner_executes_all_required_groups_and_writes_reports(tmp_path: Path) -> None:
# Act
result = BlackboxReplayRunner(output_root=tmp_path).run()
# Assert
assert result.completed_groups == set(ScenarioGroup)
rows = list(csv.DictReader(result.csv_path.open(encoding="utf-8")))
assert rows
assert rows[0].keys() == set(REPORT_COLUMNS)
assert {row["Result"] for row in rows} <= {"pass", "blocked"}
security_row = next(row for row in rows if row["Test ID"] == "NFT-SEC-INFRA")
assert security_row["Result"] == "pass"
assert security_row["Source Label"] == "untrusted_cache_rejected"
markdown = result.markdown_path.read_text(encoding="utf-8")
assert "FDR Validation Summary" in markdown
assert "SITL prerequisite blocked" in markdown
assert "Jetson prerequisite blocked" in markdown
def test_runner_uses_configurable_input_root_for_replay_fixtures(tmp_path: Path) -> None:
# Arrange
input_root = tmp_path / "input"
(input_root / "expected_results").mkdir(parents=True)
(input_root / "coordinates.csv").write_text("image,lat,lon\nAD000001.jpg,48.0,37.0\n")
(input_root / "expected_results" / "results_report.md").write_text("# Expected results\n")
# Act
result = BlackboxReplayRunner(output_root=tmp_path / "output", input_root=input_root).run()
# Assert
reports_by_id = {report.scenario_id: report for report in result.reports}
assert reports_by_id["FT-P-01"].result == ScenarioResult.PASS
assert reports_by_id["NFT-PERF-INFRA"].result == ScenarioResult.PASS
def test_runner_keeps_generated_artifacts_run_scoped(tmp_path: Path) -> None:
# Act
result = BlackboxReplayRunner(output_root=tmp_path).run()
# Assert
assert result.run_dir.parent == tmp_path
assert result.csv_path.parent == result.run_dir
assert result.markdown_path.parent == result.run_dir
for report in result.reports:
assert report.artifacts
assert all(artifact.parent.parent == result.run_dir for artifact in report.artifacts)
-86
View File
@@ -1,86 +0,0 @@
from pathlib import Path
from e2e.replay.harness import BlackboxReplayRunner, ResourceSample, summarize_resource_samples
from fdr_observability import FdrExportRequest, FdrPayload, InMemoryFlightRecorder
from shared.contracts import FdrEvent
def test_jetson_resource_metric_summary_captures_memory_and_throttle_fields() -> None:
# Arrange
samples = (
ResourceSample(
timestamp_s=0.0,
process_rss_bytes=1_000_000_000,
shared_memory_used_bytes=2_000_000_000,
cuda_allocated_bytes=500_000_000,
throttle_active=False,
temperature_c=55.0,
),
ResourceSample(
timestamp_s=60.0,
process_rss_bytes=1_200_000_000,
shared_memory_used_bytes=2_300_000_000,
cuda_allocated_bytes=650_000_000,
throttle_active=False,
temperature_c=62.0,
),
)
# Act
summary = summarize_resource_samples(samples)
# Assert
assert summary["duration_s"] == 60.0
assert summary["peak_shared_memory_used_bytes"] == 2_300_000_000.0
assert summary["peak_cuda_allocated_bytes"] == 650_000_000.0
assert summary["throttle_observed"] is False
assert summary["max_temperature_c"] == 62.0
def test_missing_thermal_hardware_reports_blocked_prerequisite(tmp_path: Path) -> None:
# Act
result = BlackboxReplayRunner(output_root=tmp_path).run()
# Assert
resource_report = next(report for report in result.reports if report.group.value == "resource-limit")
assert resource_report.result.value == "blocked"
assert "Jetson prerequisite blocked" in resource_report.error_message
def test_fdr_rollover_logs_segments_without_raw_frame_retention() -> None:
# Arrange
recorder = InMemoryFlightRecorder(segment_limit_bytes=100, storage_limit_bytes=500)
# Act
first = recorder.append_event(
_event("estimate", 1, "fdr://payload/gps-input-1"),
FdrPayload(ref="fdr://payload/gps-input-1", size_bytes=60, redacted=True),
)
second = recorder.append_event(
_event("health", 2, "fdr://payload/health-1"),
FdrPayload(ref="fdr://payload/health-1", size_bytes=60, redacted=True),
)
export = recorder.export(
FdrExportRequest(mission_id="mission-001", run_id="run-001", include_analytics=True)
)
# Assert
assert first.appended is True
assert second.rollover is True
assert recorder.health.status == "ready"
assert export.produced is True
assert len(export.segments) == 2
assert all("raw-frame" not in segment.segment_id for segment in export.segments)
assert export.analytics_ref is not None
def _event(event_type: str, timestamp_ns: int, payload_ref: str) -> FdrEvent:
return FdrEvent(
event_type=event_type,
timestamp_ns=timestamp_ns,
component="blackbox_resource_test",
severity="info",
payload_ref=payload_ref,
mission_id="mission-001",
run_id="run-001",
)
-123
View File
@@ -1,123 +0,0 @@
from anchor_verification import AnchorFrame, CandidateTile, GeometryGatedAnchorVerifier
from e2e.replay.harness import SatelliteCacheStub
from satellite_service import (
LocalVprIndexPackage,
LocalVprRetriever,
RelocalizationRequest,
SatelliteSyncBoundary,
VprDescriptorRecord,
)
from shared.contracts import VprCandidate
from tile_manager import GeneratedTileSyncPackage
def test_verified_anchor_includes_retrieval_matching_and_provenance_evidence() -> None:
# Arrange
retriever = LocalVprRetriever()
retriever.load_index(
LocalVprIndexPackage(
package_id="fixture-index",
records=(
VprDescriptorRecord(
chunk_id="chunk-001",
tile_id="tile-001",
descriptor=(1.0, 0.0, 0.0),
footprint={"min_lat": 48.0, "max_lat": 48.1, "min_lon": 37.0, "max_lon": 37.1},
freshness_status="fresh",
),
),
)
)
retrieval = retriever.retrieve(
RelocalizationRequest(
frame_id="frame-001",
image_ref="AD000001.jpg",
trigger_reason="cold_start",
top_k=1,
query_descriptor=(1.0, 0.0, 0.0),
)
)
keypoints = tuple((float(index), float(index % 5)) for index in range(24))
shifted_keypoints = tuple((x + 1.0, y + 1.0) for x, y in keypoints)
verifier = GeometryGatedAnchorVerifier()
# Act
verification = verifier.verify_candidate(
AnchorFrame(frame_id="frame-001", image_ref="AD000001.jpg", keypoints=keypoints),
CandidateTile(
candidate=retrieval.candidates[0],
image_ref="tile-001.cog",
keypoints=shifted_keypoints,
provenance_trusted=True,
),
)
# Assert
assert retrieval.ready is True
assert retrieval.latency_ms is not None
assert verification.decision.accepted is True
assert verification.decision.candidate_id == "chunk-001"
assert verification.decision.inliers >= 20
assert verification.decision.mean_reprojection_error_px <= 3.0
assert verification.homography is not None
assert verification.freshness_status == "fresh"
def test_unsafe_cache_or_low_texture_candidates_never_emit_trusted_anchor() -> None:
# Arrange
verifier = GeometryGatedAnchorVerifier()
frame = AnchorFrame(
frame_id="frame-low-texture",
image_ref="low-texture.jpg",
usable_for_anchor=False,
keypoints=((0.0, 0.0), (1.0, 1.0), (2.0, 2.0), (3.0, 3.0)),
)
candidate = VprCandidate(
chunk_id="chunk-stale",
tile_id="tile-stale",
score=0.9,
footprint={"min_lat": 48.0, "max_lat": 48.1, "min_lon": 37.0, "max_lon": 37.1},
freshness_status="stale",
)
# Act
verification = verifier.verify_candidate(
frame,
CandidateTile(
candidate=candidate,
image_ref="tile-stale.cog",
keypoints=((0.0, 0.0), (1.0, 1.0), (2.0, 2.0), (3.0, 3.0)),
provenance_trusted=False,
),
)
# Assert
assert verification.decision.accepted is False
assert verification.decision.rejection_reason == "frame_not_usable"
def test_flight_mode_missing_cache_does_not_attempt_external_access() -> None:
# Arrange
cache_stub = SatelliteCacheStub()
sync_boundary = SatelliteSyncBoundary()
# Act
cache_response = cache_stub.query_manifest("NFT-SEC-04", "missing")
sync_result = sync_boundary.upload_generated_tiles(
GeneratedTileSyncPackage(
package_ref="generated-empty",
mission_id="mission-001",
manifest_delta=(),
sidecars=(),
),
phase="in_flight",
)
# Assert
assert cache_response["network_fetch_attempted"] is False
assert cache_response["trusted"] is False
assert int(str(cache_response["fixture_size_bytes"])) < int(
str(cache_response["storage_budget_bytes"])
)
assert sync_result.error is not None
assert sync_result.error.cause == "mid_flight_network_blocked"
-68
View File
@@ -1,68 +0,0 @@
from pathlib import Path
import pytest
from e2e.replay.harness import (
ReplayEstimate,
evaluate_still_image_estimates,
load_expected_coordinates,
)
def test_expected_coordinate_loader_rejects_invalid_wgs84_rows(tmp_path: Path) -> None:
# Arrange
coordinates_path = tmp_path / "coordinates.csv"
coordinates_path.write_text("image, lat, lon\nAD000001.jpg, 120.0, 37.0\n", encoding="utf-8")
# Act / Assert
with pytest.raises(ValueError, match="outside WGS84 bounds"):
load_expected_coordinates(coordinates_path)
def test_still_image_replay_reports_coordinate_thresholds_and_latency() -> None:
# Arrange
expected = load_expected_coordinates(Path("_docs/00_problem/input_data/coordinates.csv"))
estimates = tuple(
ReplayEstimate(
image_ref=coordinate.image_ref,
latitude_deg=coordinate.latitude_deg + 0.00001,
longitude_deg=coordinate.longitude_deg + 0.00001,
covariance_95_semi_major_m=8.0,
source_label="satellite_anchored",
anchor_age_ms=150,
capture_to_output_latency_ms=40.0 + index,
)
for index, coordinate in enumerate(expected)
)
# Act
metrics = evaluate_still_image_estimates(expected, estimates)
# Assert
assert metrics["threshold_passed"] is True
assert metrics["within_50_m_rate"] >= 0.80
assert metrics["within_20_m_rate"] >= 0.50
assert metrics["p50_latency_ms"] > 0.0
assert metrics["p95_latency_ms"] >= metrics["p50_latency_ms"]
assert metrics["p99_latency_ms"] >= metrics["p95_latency_ms"]
assert metrics["dropped_frame_rate"] == 0.0
def test_confidence_contract_validation_fails_missing_source_label() -> None:
# Arrange
expected = load_expected_coordinates(Path("_docs/00_problem/input_data/coordinates.csv"))[:1]
estimates = (
ReplayEstimate(
image_ref=expected[0].image_ref,
latitude_deg=expected[0].latitude_deg,
longitude_deg=expected[0].longitude_deg,
covariance_95_semi_major_m=8.0,
source_label="",
anchor_age_ms=0,
capture_to_output_latency_ms=10.0,
),
)
# Act / Assert
with pytest.raises(ValueError, match="source label is missing"):
evaluate_still_image_estimates(expected, estimates)
-88
View File
@@ -1,88 +0,0 @@
from pathlib import Path
import pytest
from e2e.replay.harness import (
BlackboxReplayRunner,
ScenarioConfig,
ScenarioGroup,
ScenarioResult,
validate_derkachi_alignment,
)
from shared.contracts import FramePacket, TelemetrySample
from vio_adapter import VioInputPacket, VioRuntimeConfig, create_vio_adapter
def test_derkachi_alignment_validator_accepts_expected_fixture_shape() -> None:
# Act
metrics = validate_derkachi_alignment(
video_duration_s=490.07,
telemetry_duration_s=490.07,
telemetry_rows=4_900,
)
# Assert
assert metrics["alignment_valid"] is True
assert metrics["duration_delta_s"] == 0.0
assert metrics["frames_per_telemetry"] == pytest.approx(3.0, abs=0.05)
def test_derkachi_alignment_validator_blocks_duration_drift() -> None:
# Act / Assert
with pytest.raises(ValueError, match="more than 250 ms"):
validate_derkachi_alignment(
video_duration_s=490.07,
telemetry_duration_s=489.50,
telemetry_rows=4_900,
)
def test_public_vio_replay_boundary_emits_frame_by_frame_estimate() -> None:
# Arrange
adapter = create_vio_adapter(VioRuntimeConfig(environment="development", mode="replay"))
frame = FramePacket(
frame_id="derkachi-0001",
timestamp_ns=1_000_000_000,
image_ref="_docs/00_problem/input_data/flight_derkachi/flight_derkachi.mp4#0",
calibration_id="derkachi-calibration-gated",
occlusion="clear",
quality=0.9,
)
telemetry = (
TelemetrySample(
timestamp_ns=1_000_000_000,
imu={"accel_x": 0.0, "accel_y": 0.0, "accel_z": -9.8},
attitude={"roll": 0.0, "pitch": 0.0, "yaw": 1.0},
altitude_m=400.0,
airspeed_mps=22.0,
gps_health="healthy",
),
)
# Act
result = adapter.process(VioInputPacket(frame=frame, telemetry_samples=telemetry))
# Assert
assert result.state_packet is not None
assert result.health.state == "ready"
assert result.state_packet.timestamp_ns == frame.timestamp_ns
assert result.state_packet.tracking_quality > 0.0
def test_public_dataset_and_calibration_prerequisites_are_reported_blocked(tmp_path: Path) -> None:
# Arrange
scenario = ScenarioConfig(
scenario_id="FT-P-03-CALIBRATION",
name="Calibration-gated public VIO dataset",
group=ScenarioGroup.PERFORMANCE,
input_dataset="public_nadir_vio_candidates",
required_paths=(tmp_path / "camera_intrinsics.yaml",),
)
# Act
result = BlackboxReplayRunner(output_root=tmp_path, scenarios=(scenario,)).run()
# Assert
report = result.reports[0]
assert report.result == ScenarioResult.BLOCKED
assert "camera_intrinsics.yaml" in report.error_message
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
@@ -1 +0,0 @@
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@
@@ -1,65 +0,0 @@
from shared.config import validate_runtime_profile
from shared.errors import ErrorEnvelope, ResultEnvelope
from shared.telemetry import HealthEvent, MetricsLabels
def test_missing_production_cache_dir_returns_readiness_failure() -> None:
# Arrange
payload = {
"environment": "production",
"config_dir": "/etc/gps-denied-onboard",
"fdr_dir": "/var/lib/gps-denied/fdr",
"database_url": "postgresql://localhost/gpsd",
"mavlink_url": "serial:/dev/ttyTHS1:921600",
"camera_source": "hardware",
"signing_key_ref": "secret-ref",
}
# Act
result = validate_runtime_profile("runtime", payload)
# Assert
assert result.ok is False
assert result.error is not None
assert result.error.component == "runtime"
assert result.error.category == "configuration"
assert result.error.severity == "critical"
assert result.error.retryable is False
def test_dependency_error_envelope_has_required_structured_fields() -> None:
# Act
result = ResultEnvelope.failure(
ErrorEnvelope(
component="tile_manager",
category="dependency",
message="postgis unavailable",
severity="error",
retryable=True,
cause="connection refused",
)
)
# Assert
assert result.ok is False
assert result.error is not None
assert result.error.component == "tile_manager"
assert result.error.category == "dependency"
assert result.error.severity == "error"
assert result.error.retryable is True
def test_health_event_and_metrics_labels_are_fdr_safe_metadata() -> None:
# Act
health = HealthEvent(
component="runtime",
timestamp_ns=1,
liveness="alive",
readiness="ready",
dependency_state={"postgis": "ready"},
)
labels = MetricsLabels(component="runtime", action="startup", status="ok")
# Assert
assert health.dependency_state["postgis"] == "ready"
assert labels.status == "ok"
@@ -1,41 +0,0 @@
from shared.geo_geometry import Wgs84Coordinate, distance_m, local_to_wgs84, wgs84_to_local
from shared.time_sync import check_monotonic_timestamps, select_time_window
def test_wgs84_local_round_trip_is_deterministic() -> None:
# Arrange
origin = Wgs84Coordinate(latitude_deg=49.9808, longitude_deg=36.2527, altitude_m=120.0)
point = Wgs84Coordinate(latitude_deg=49.9811, longitude_deg=36.2531, altitude_m=118.0)
# Act
local = wgs84_to_local(origin, point)
round_trip = local_to_wgs84(origin, local)
# Assert
assert round(round_trip.latitude_deg, 7) == round(point.latitude_deg, 7)
assert round(round_trip.longitude_deg, 7) == round(point.longitude_deg, 7)
assert round(round_trip.altitude_m, 7) == round(point.altitude_m, 7)
assert distance_m(origin, point) > 0.0
def test_non_monotonic_timestamps_return_explicit_violation() -> None:
# Act
violations = check_monotonic_timestamps([100, 200, 150])
# Assert
assert len(violations) == 1
assert violations[0].category == "timestamp_mismatch"
def test_time_window_reports_gap_instead_of_dropping_silently() -> None:
# Act
result = select_time_window(
frame_timestamp_ns=1_000,
sample_timestamps_ns=[100, 200, 300],
tolerance_ns=50,
)
# Assert
assert result.ok is False
assert result.sample_timestamps_ns == ()
assert result.violations[0].category == "gap_exceeded"
-157
View File
@@ -1,157 +0,0 @@
import pytest
from pydantic import ValidationError
from shared.contracts import (
AnchorDecision,
CacheTileRecord,
FdrEvent,
FramePacket,
PositionEstimate,
TelemetrySample,
VioStatePacket,
VprCandidate,
)
def test_runtime_dtos_accept_valid_minimal_values() -> None:
# Arrange
timestamp_ns = 1_000_000
# Act
contracts = [
FramePacket(
frame_id="frame-1",
timestamp_ns=timestamp_ns,
image_ref="frames/frame-1",
calibration_id="calib-1",
occlusion="clear",
quality=0.95,
),
TelemetrySample(
timestamp_ns=timestamp_ns,
imu={"accel_x": 0.1},
attitude={"roll": 0.0},
altitude_m=950.0,
airspeed_mps=16.0,
gps_health="lost",
),
VioStatePacket(
timestamp_ns=timestamp_ns,
relative_pose={"x": 1.0},
velocity_mps=(1.0, 0.0, 0.0),
tracking_quality=0.8,
),
PositionEstimate(
timestamp_ns=timestamp_ns,
latitude_deg=49.9,
longitude_deg=36.2,
altitude_m=950.0,
covariance_semimajor_m=12.0,
source_label="satellite_anchored",
fix_type=3,
horizontal_accuracy_m=12.0,
anchor_age_ms=200,
),
VprCandidate(
chunk_id="chunk-1",
tile_id="tile-1",
score=0.87,
footprint={"min_lat": 49.0},
freshness_status="fresh",
),
AnchorDecision(
candidate_id="candidate-1",
accepted=True,
estimated_pose={"x": 1.0},
inliers=42,
mean_reprojection_error_px=0.8,
),
CacheTileRecord(
tile_id="tile-1",
crs="EPSG:3857",
meters_per_pixel=0.3,
capture_date="2026-05-03",
signature_hash="sha256:abc",
trust_level="trusted",
freshness_status="fresh",
provenance="suite-satellite-service",
),
FdrEvent(
event_type="health",
timestamp_ns=timestamp_ns,
component="shared.contracts",
severity="info",
payload_ref="fdr://segment/1",
mission_id="mission-1",
run_id="run-1",
),
]
# Assert
assert len(contracts) == 8
def test_missing_required_timestamp_is_rejected_with_structured_error() -> None:
# Act
with pytest.raises(ValidationError) as error:
FramePacket(
frame_id="frame-1",
image_ref="frames/frame-1",
calibration_id="calib-1",
occlusion="clear",
quality=0.95,
)
# Assert
assert error.value.errors()[0]["loc"] == ("timestamp_ns",)
assert error.value.errors()[0]["type"] == "missing"
def test_raw_frame_retention_is_rejected() -> None:
# Act
with pytest.raises(ValidationError) as error:
FramePacket(
frame_id="frame-1",
timestamp_ns=1,
image_ref="frames/frame-1",
calibration_id="calib-1",
occlusion="clear",
quality=0.95,
raw_frame_retained=True,
)
# Assert
assert "raw frame payloads must be referenced" in str(error.value)
def test_position_accuracy_cannot_under_report_covariance() -> None:
# Act
with pytest.raises(ValidationError) as error:
PositionEstimate(
timestamp_ns=1,
latitude_deg=49.9,
longitude_deg=36.2,
altitude_m=950.0,
covariance_semimajor_m=50.0,
source_label="satellite_anchored",
fix_type=3,
horizontal_accuracy_m=10.0,
anchor_age_ms=200,
)
# Assert
assert "must not under-report" in str(error.value)
def test_accepted_anchor_requires_estimated_pose() -> None:
# Act
with pytest.raises(ValidationError) as error:
AnchorDecision(
candidate_id="candidate-1",
accepted=True,
inliers=42,
mean_reprojection_error_px=0.8,
)
# Assert
assert "accepted anchor decisions require estimated_pose" in str(error.value)
-170
View File
@@ -1,170 +0,0 @@
from anchor_verification import AnchorFrame, CandidateTile, GeometryGatedAnchorVerifier, MatchEvidence
from shared.contracts import VprCandidate
def _candidate(freshness_status: str = "fresh") -> VprCandidate:
return VprCandidate(
chunk_id="chunk-1",
tile_id="tile-1",
score=0.91,
footprint={"min_lat": 49.0, "max_lat": 49.2, "min_lon": 36.0, "max_lon": 36.2},
freshness_status=freshness_status,
)
def _evidence(**overrides: object) -> MatchEvidence:
payload: dict[str, object] = {
"candidate": _candidate(),
"matcher_profile": "aliked_lightglue",
"inliers": 48,
"mean_reprojection_error_px": 1.4,
"homography": {"h00": 1.0, "h11": 1.0, "h22": 1.0},
"runtime_ms": 72.5,
"provenance_trusted": True,
}
payload.update(overrides)
return MatchEvidence.model_validate(payload)
def _frame_with_keypoints() -> AnchorFrame:
keypoints = tuple((float(x * 10), float(y * 10)) for x in range(5) for y in range(5))
return AnchorFrame(
frame_id="frame-1",
image_ref="replay/frame-1.jpg",
keypoints=keypoints,
)
def _tile_with_keypoints(**overrides: object) -> CandidateTile:
keypoints = tuple(
(float(x * 10 + 2), float(y * 10 + 3)) for x in range(5) for y in range(5)
)
payload: dict[str, object] = {
"candidate": _candidate(),
"image_ref": "cache/tile-1.tif",
"keypoints": keypoints,
"provenance_trusted": True,
}
payload.update(overrides)
return CandidateTile.model_validate(payload)
def test_candidate_verification_emits_acceptance_evidence() -> None:
# Arrange
verifier = GeometryGatedAnchorVerifier()
frame = AnchorFrame(frame_id="frame-1", image_ref="replay/frame-1.jpg")
# Act
result = verifier.verify(frame, _evidence())
# Assert
assert result.decision.accepted is True
assert result.decision.inliers == 48
assert result.decision.mean_reprojection_error_px == 1.4
assert result.reason == "accepted_geometry"
assert result.homography == {"h00": 1.0, "h11": 1.0, "h22": 1.0}
def test_matching_path_computes_evidence_from_frame_and_tile_inputs() -> None:
# Arrange
verifier = GeometryGatedAnchorVerifier()
frame = _frame_with_keypoints()
tile = _tile_with_keypoints()
# Act
result = verifier.verify_candidate(frame, tile, matcher_profile="sift_orb")
# Assert
assert result.decision.accepted is True
assert result.decision.inliers == 25
assert result.reason == "accepted_geometry"
assert result.matcher_profile == "sift_orb"
assert result.homography is not None
assert result.homography["h02"] == 2.0
assert result.homography["h12"] == 3.0
def test_unsafe_candidate_is_rejected_with_reason() -> None:
# Arrange
verifier = GeometryGatedAnchorVerifier()
frame = AnchorFrame(frame_id="frame-1", image_ref="replay/frame-1.jpg")
evidence = _evidence(
candidate=_candidate(freshness_status="stale"),
inliers=6,
mean_reprojection_error_px=8.0,
)
# Act
result = verifier.verify(frame, evidence)
# Assert
assert result.decision.accepted is False
assert result.decision.estimated_pose is None
assert result.decision.rejection_reason == "stale_or_untrusted_provenance"
assert result.reason == "stale_or_untrusted_provenance"
def test_computed_matching_rejects_low_inlier_geometry() -> None:
# Arrange
verifier = GeometryGatedAnchorVerifier()
frame = _frame_with_keypoints()
tile = _tile_with_keypoints(
keypoints=((100.0, 100.0), (12.0, 3.0), (99.0, 88.0), (50.0, 40.0), (6.0, 9.0))
)
# Act
result = verifier.verify_candidate(frame, tile, matcher_profile="sift_orb")
# Assert
assert result.decision.accepted is False
assert result.reason == "low_inliers"
assert result.decision.rejection_reason == "low_inliers"
def test_matcher_benchmark_reports_profile_runtime_and_quality_metrics() -> None:
# Arrange
verifier = GeometryGatedAnchorVerifier()
frame = AnchorFrame(frame_id="frame-1", image_ref="replay/frame-1.jpg")
# Act
report = verifier.benchmark(
frame,
(
_evidence(matcher_profile="aliked_lightglue", runtime_ms=72.5),
_evidence(matcher_profile="sift_orb", inliers=12, runtime_ms=18.0),
),
)
# Assert
assert [result.matcher_profile for result in report.results] == [
"aliked_lightglue",
"sift_orb",
]
assert report.results[0].accepted is True
assert report.results[0].runtime_ms == 72.5
assert report.results[1].accepted is False
assert report.results[1].reason == "low_inliers"
def test_matcher_benchmark_can_run_computed_paths() -> None:
# Arrange
verifier = GeometryGatedAnchorVerifier()
frame = _frame_with_keypoints()
# Act
report = verifier.benchmark_candidates(
frame,
(
_tile_with_keypoints(),
_tile_with_keypoints(
keypoints=((2.0, 3.0), (2.0, 13.0), (2.0, 23.0), (2.0, 33.0), (2.0, 43.0))
),
),
matcher_profile="sift_orb",
)
# Assert
assert report.results[0].accepted is True
assert report.results[0].runtime_ms >= 0.0
assert report.results[1].accepted is False
assert report.results[1].reason == "low_inliers"
@@ -1,76 +0,0 @@
import pytest
from pydantic import ValidationError
from camera_ingest_calibration import (
CalibrationMetadata,
CameraFrameIngestor,
NavigationFrame,
)
def _calibration() -> CalibrationMetadata:
return CalibrationMetadata(
calibration_id="calib-front-1",
camera_model="global-shutter",
image_width_px=1920,
image_height_px=1080,
focal_length_px=840.0,
distortion_model="plumb_bob",
)
def test_valid_frame_packet_contains_metadata_reports_and_normalization_hint() -> None:
# Arrange
frame = NavigationFrame(
frame_id="frame-1",
timestamp_ns=1_000,
image_ref="replay/frame-1.jpg",
mean_luma=0.7,
contrast=0.6,
north_up_degrees=12.5,
)
# Act
packet = CameraFrameIngestor().ingest(frame, _calibration())
# Assert
assert packet.contract.timestamp_ns == 1_000
assert packet.contract.calibration_id == "calib-front-1"
assert packet.quality_report.state == "usable"
assert packet.occlusion_report.state == "clear"
assert packet.normalization_hint.should_normalize_downstream is True
def test_total_occlusion_marks_frame_unusable_for_vio_and_anchor() -> None:
# Arrange
frame = NavigationFrame(
frame_id="frame-blackout",
timestamp_ns=2_000,
image_ref="replay/frame-blackout.jpg",
mean_luma=0.01,
contrast=0.01,
)
# Act
packet = CameraFrameIngestor().ingest(frame, _calibration())
# Assert
assert packet.occlusion_report.state == "total"
assert packet.usable_for_vio is False
assert packet.usable_for_anchor is False
def test_raw_frame_payload_retention_is_rejected() -> None:
# Act
with pytest.raises(ValidationError) as error:
NavigationFrame(
frame_id="frame-raw",
timestamp_ns=3_000,
image_ref="replay/frame-raw.jpg",
mean_luma=0.7,
contrast=0.6,
raw_frame_retained=True,
)
# Assert
assert "references only" in str(error.value)
-64
View File
@@ -1,64 +0,0 @@
from shared.contracts import FdrEvent
from fdr_observability import FdrExportRequest, FdrPayload, InMemoryFlightRecorder
def _event(event_type: str = "anchor") -> FdrEvent:
return FdrEvent(
event_type=event_type,
timestamp_ns=1_000,
component="anchor_verification",
severity="info",
payload_ref="pending",
mission_id="mission-1",
run_id="run-1",
)
def test_valid_event_append_indexes_metadata_and_payload_reference() -> None:
# Arrange
recorder = InMemoryFlightRecorder(segment_limit_bytes=1_000, storage_limit_bytes=2_000)
payload = FdrPayload(ref="fdr://segments/1/payloads/anchor-1.cbor", size_bytes=128)
# Act
result = recorder.append_event(_event(), payload)
# Assert
assert result.appended is True
assert result.event is not None
assert result.event.payload_ref == payload.ref
assert result.segment_id == "segment-0001"
assert recorder.health.status == "ready"
def test_rollover_threshold_records_explicit_rollover_result() -> None:
# Arrange
recorder = InMemoryFlightRecorder(segment_limit_bytes=100, storage_limit_bytes=500)
recorder.append_event(_event("first"), FdrPayload(ref="fdr://payloads/1", size_bytes=80))
# Act
result = recorder.append_event(
_event("second"), FdrPayload(ref="fdr://payloads/2", size_bytes=50)
)
# Assert
assert result.appended is True
assert result.rollover is True
assert result.segment_id == "segment-0002"
def test_export_request_produces_queryable_evidence_artifacts() -> None:
# Arrange
recorder = InMemoryFlightRecorder(segment_limit_bytes=1_000, storage_limit_bytes=2_000)
recorder.append_event(_event(), FdrPayload(ref="fdr://payloads/1", size_bytes=128))
# Act
result = recorder.export(
FdrExportRequest(mission_id="mission-1", run_id="run-1", include_analytics=True)
)
# Assert
assert result.produced is True
assert result.evidence_ref == "fdr://exports/mission-1/run-1/evidence.json"
assert result.analytics_ref == "fdr://exports/mission-1/run-1/analytics.parquet"
assert result.segments[0].event_count == 1
@@ -1,72 +0,0 @@
from shared.contracts import PositionEstimate
from mavlink_gcs_integration import (
FlightControllerTelemetry,
InMemoryMavlinkGateway,
OperatorStatusMessage,
)
def test_telemetry_subscription_emits_normalized_sample() -> None:
# Arrange
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000)
telemetry = FlightControllerTelemetry(
timestamp_ns=1_000,
acceleration_mps2=(0.1, 0.2, -9.8),
attitude_rad=(0.01, 0.02, 1.57),
altitude_m=250.0,
airspeed_mps=17.5,
gps_health="lost",
)
# Act
samples = gateway.subscribe_telemetry([telemetry])
# Assert
assert len(samples) == 1
assert samples[0].imu["accel_z"] == -9.8
assert samples[0].attitude["yaw"] == 1.57
assert samples[0].gps_health == "lost"
def test_invalid_gps_input_estimate_is_rejected_without_emission() -> None:
# Arrange
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000)
estimate = PositionEstimate(
timestamp_ns=2_000,
latitude_deg=49.9,
longitude_deg=36.2,
altitude_m=250.0,
covariance_semimajor_m=10.0,
source_label="no_fix",
fix_type=1,
horizontal_accuracy_m=10.0,
anchor_age_ms=0,
)
# Act
result = gateway.emit_gps_input(estimate)
# Assert
assert result.emitted is False
assert result.error is not None
assert result.error.category == "validation"
assert gateway.emitted_gps_inputs == []
def test_operator_status_messages_are_rate_limited_by_text() -> None:
# Arrange
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000)
messages = [
OperatorStatusMessage(timestamp_ns=1_000, severity="warning", text="GPS denied"),
OperatorStatusMessage(timestamp_ns=1_500, severity="warning", text="GPS denied"),
OperatorStatusMessage(timestamp_ns=2_100, severity="warning", text="GPS denied"),
]
# Act
result = gateway.emit_status(messages)
# Assert
assert [message.timestamp_ns for message in result.emitted] == [1_000, 2_100]
assert [message.timestamp_ns for message in result.suppressed] == [1_500]
assert len(gateway.emitted_status_messages) == 2
-102
View File
@@ -1,102 +0,0 @@
from safety_anchor_wrapper import SafetyAnchorStateMachine, SafetyStateConfig, TelemetryContext
from shared.contracts import AnchorDecision, VioStatePacket
def _telemetry() -> TelemetryContext:
return TelemetryContext(
timestamp_ns=1_000_000,
latitude_hint_deg=49.1,
longitude_hint_deg=36.1,
altitude_m=120.0,
)
def _vio_state(**overrides: object) -> VioStatePacket:
payload: dict[str, object] = {
"timestamp_ns": 1_000_000,
"relative_pose": {"x_m": 1.0, "y_m": 0.0, "z_m": 0.0},
"velocity_mps": (12.0, 0.0, 0.0),
"tracking_quality": 0.9,
"covariance_hint": [[1.8, 0.0], [0.0, 1.8]],
}
payload.update(overrides)
return VioStatePacket.model_validate(payload)
def _accepted_anchor() -> AnchorDecision:
return AnchorDecision(
candidate_id="chunk-1",
accepted=True,
estimated_pose={"latitude_deg": 49.2, "longitude_deg": 36.2, "altitude_m": 121.0},
inliers=48,
mean_reprojection_error_px=1.2,
)
def test_vio_state_updates_position_estimate_with_honest_covariance() -> None:
# Arrange
machine = SafetyAnchorStateMachine()
# Act
snapshot = machine.update_vio(_vio_state(), _telemetry())
# Assert
assert snapshot.estimate.source_label == "vo_extrapolated"
assert snapshot.estimate.latitude_deg == 49.1
assert snapshot.estimate.covariance_semimajor_m == 1.8
assert snapshot.estimate.horizontal_accuracy_m >= snapshot.estimate.covariance_semimajor_m
def test_accepted_anchor_corrects_state_and_records_evidence() -> None:
# Arrange
machine = SafetyAnchorStateMachine()
machine.update_vio(_vio_state(), _telemetry())
# Act
snapshot = machine.consider_anchor(_accepted_anchor())
# Assert
assert snapshot.mode == "satellite_anchored"
assert snapshot.estimate.latitude_deg == 49.2
assert snapshot.anchor_evidence is not None
assert snapshot.anchor_evidence.candidate_id == "chunk-1"
def test_blackout_degrades_then_reaches_no_fix_with_monotonic_covariance() -> None:
# Arrange
machine = SafetyAnchorStateMachine(
SafetyStateConfig(dead_reckoning_growth_m=250.0, no_fix_covariance_threshold_m=500.0)
)
machine.update_vio(_vio_state(covariance_hint=[[100.0]]), _telemetry())
# Act
degraded = machine.propagate_blackout(2_000_000)
no_fix = machine.propagate_blackout(3_000_000)
# Assert
assert degraded.mode == "dead_reckoned"
assert degraded.estimate.covariance_semimajor_m == 350.0
assert no_fix.mode == "no_fix"
assert no_fix.estimate.fix_type == 0
assert no_fix.estimate.covariance_semimajor_m > degraded.estimate.covariance_semimajor_m
def test_tile_write_eligibility_requires_trusted_low_covariance_pose() -> None:
# Arrange
machine = SafetyAnchorStateMachine(SafetyStateConfig(tile_write_covariance_max_m=3.0))
machine.update_vio(_vio_state(covariance_hint=[[4.0]]), _telemetry())
# Act
high_covariance = machine.tile_write_eligibility()
machine.consider_anchor(_accepted_anchor())
anchored = machine.tile_write_eligibility()
machine.propagate_blackout(2_000_000)
blackout = machine.tile_write_eligibility()
# Assert
assert high_covariance.eligible is False
assert high_covariance.reason == "covariance_too_high"
assert anchored.eligible is True
assert anchored.reason == "trusted_pose"
assert blackout.eligible is False
assert blackout.reason == "untrusted_source_label"
-96
View File
@@ -1,96 +0,0 @@
from datetime import datetime, timezone
from satellite_service import MissionCachePackage, SatelliteSyncBoundary
from tile_manager import (
GeneratedTileSidecar,
GeneratedTileSyncPackage,
TileManifestEntry,
)
def _manifest_entry() -> TileManifestEntry:
return TileManifestEntry(
tile_id="tile-1",
chunk_id="chunk-1",
crs="EPSG:3857",
meters_per_pixel=0.3,
capture_date="2026-05-01",
expires_at=datetime(2026, 6, 1, tzinfo=timezone.utc),
content_hash="sha256:tile",
expected_content_hash="sha256:tile",
sidecar_hash="sha256:sidecar",
expected_sidecar_hash="sha256:sidecar",
signature_hash="sig:trusted",
provenance="suite-satellite-service",
footprint={"min_lat": 49.0, "max_lat": 49.1},
descriptor_ref="descriptors/chunk-1.vlad",
)
def _generated_package() -> GeneratedTileSyncPackage:
sidecar = GeneratedTileSidecar(
tile_id="generated-1",
parent_frame_id="frame-1",
parent_covariance_m=2.0,
quality_score=0.8,
trust_level="generated",
provenance="nav-camera-generated",
)
return GeneratedTileSyncPackage(
package_ref="generated/mission-1/sync-package.json",
mission_id="mission-1",
manifest_delta=({"tile_id": "generated-1", "trust_level": "generated"},),
sidecars=(sidecar,),
)
def test_pre_flight_import_returns_package_for_tile_manager_validation() -> None:
# Arrange
boundary = SatelliteSyncBoundary()
package = MissionCachePackage(
package_id="pkg-1",
mission_id="mission-1",
manifest_entries=(_manifest_entry(),),
)
# Act
result = boundary.import_mission_cache(package, phase="pre_flight")
# Assert
assert result.ready_for_tile_validation is True
assert result.manifest_entries[0].tile_id == "tile-1"
assert boundary.status().imported_package_ids == ("pkg-1",)
def test_post_flight_upload_records_retryable_failure_for_audit() -> None:
# Arrange
boundary = SatelliteSyncBoundary(uploader=lambda package: "retryable_failure")
# Act
result = boundary.upload_generated_tiles(_generated_package(), phase="post_flight")
# Assert
assert result.upload_record is not None
assert result.upload_record.status == "retryable_failure"
assert result.upload_record.retained_for_retry is True
assert boundary.status().retry_package_refs == ("generated/mission-1/sync-package.json",)
def test_in_flight_sync_is_blocked_without_calling_network_boundary() -> None:
# Arrange
calls: list[str] = []
def uploader(package: GeneratedTileSyncPackage) -> str:
calls.append(package.package_ref)
return "success"
boundary = SatelliteSyncBoundary(uploader=uploader)
# Act
result = boundary.upload_generated_tiles(_generated_package(), phase="in_flight")
# Assert
assert result.upload_record is None
assert result.error is not None
assert result.error.cause == "mid_flight_network_blocked"
assert calls == []
-198
View File
@@ -1,198 +0,0 @@
import json
from pathlib import Path
from satellite_service import (
LocalVprIndexPackage,
LocalVprRetriever,
RelocalizationRequest,
VprDescriptorRecord,
)
def _record(
chunk_id: str = "chunk-1",
tile_id: str = "tile-1",
descriptor: tuple[float, ...] = (1.0, 0.0, 0.0),
freshness_status: str = "fresh",
) -> VprDescriptorRecord:
return VprDescriptorRecord(
chunk_id=chunk_id,
tile_id=tile_id,
descriptor=descriptor,
footprint={"min_lat": 49.0, "max_lat": 49.1, "min_lon": 36.0, "max_lon": 36.1},
freshness_status=freshness_status,
)
def test_valid_local_index_load_reports_ready_status() -> None:
# Arrange
retriever = LocalVprRetriever()
package = LocalVprIndexPackage(package_id="index-1", records=(_record(),))
# Act
readiness = retriever.load_index(package)
# Assert
assert readiness.ready is True
assert readiness.engine == "cpu_faiss"
assert readiness.loaded_records == 1
assert readiness.package_id == "index-1"
assert readiness.descriptor_model == "dinov2_vlad"
def test_local_descriptor_index_package_loads_from_cache_file(tmp_path: Path) -> None:
# Arrange
package_path = tmp_path / "vpr-index.json"
package_path.write_text(
json.dumps(
{
"package_id": "index-file-1",
"engine": "cpu_faiss",
"descriptor_model": "dinov2_vlad",
"records": [
{
"chunk_id": "chunk-file",
"tile_id": "tile-file",
"descriptor": [1.0, 0.0],
"footprint": {
"min_lat": 49.0,
"max_lat": 49.1,
"min_lon": 36.0,
"max_lon": 36.1,
},
"freshness_status": "fresh",
}
],
}
),
encoding="utf-8",
)
retriever = LocalVprRetriever()
# Act
readiness = retriever.load_index_from_path(package_path)
# Assert
assert readiness.ready is True
assert readiness.package_id == "index-file-1"
assert readiness.loaded_records == 1
def test_loaded_index_returns_bounded_candidates_with_freshness() -> None:
# Arrange
retriever = LocalVprRetriever()
retriever.load_index(
LocalVprIndexPackage(
package_id="index-1",
records=(
_record(chunk_id="chunk-best", tile_id="tile-best", descriptor=(1.0, 0.0)),
_record(
chunk_id="chunk-stale",
tile_id="tile-stale",
descriptor=(0.8, 0.2),
freshness_status="stale",
),
),
)
)
request = RelocalizationRequest(
frame_id="frame-1",
image_ref="replay/frame-1.jpg",
trigger_reason="covariance_growth",
top_k=1,
query_descriptor=(1.0, 0.0),
)
# Act
result = retriever.retrieve(request)
# Assert
assert result.degraded is False
assert result.retrieval_path == "local_descriptor_index"
assert result.latency_ms is not None
assert len(result.candidates) == 1
assert result.candidates[0].chunk_id == "chunk-best"
assert result.candidates[0].tile_id == "tile-best"
assert result.candidates[0].freshness_status == "fresh"
def test_loaded_index_requires_query_descriptor() -> None:
# Arrange
retriever = LocalVprRetriever()
retriever.load_index(LocalVprIndexPackage(package_id="index-1", records=(_record(),)))
request = RelocalizationRequest(
frame_id="frame-1",
image_ref="replay/frame-1.jpg",
trigger_reason="covariance_growth",
top_k=1,
)
# Act
result = retriever.retrieve(request)
# Assert
assert result.ready is True
assert result.degraded is True
assert result.error is not None
assert result.error.cause == "query_descriptor_missing"
def test_missing_index_degrades_with_explicit_no_candidate_result() -> None:
# Arrange
retriever = LocalVprRetriever()
request = RelocalizationRequest(
frame_id="frame-1",
image_ref="replay/frame-1.jpg",
trigger_reason="cold_start",
top_k=3,
)
# Act
result = retriever.retrieve(request)
# Assert
assert result.ready is False
assert result.degraded is True
assert result.candidates == ()
assert result.error is not None
assert result.error.cause == "index_not_loaded"
def test_invalid_index_package_degrades_with_explicit_error(tmp_path: Path) -> None:
# Arrange
package_path = tmp_path / "invalid-index.json"
package_path.write_text("{not-json", encoding="utf-8")
retriever = LocalVprRetriever()
# Act
readiness = retriever.load_index_from_path(package_path)
result = retriever.retrieve(
RelocalizationRequest(
frame_id="frame-1",
image_ref="replay/frame-1.jpg",
trigger_reason="cold_start",
top_k=3,
query_descriptor=(1.0, 0.0),
)
)
# Assert
assert readiness.ready is False
assert readiness.error is not None
assert readiness.error.cause == "index_package_invalid"
assert result.ready is False
assert result.degraded is True
assert result.error is not None
assert result.error.cause == "index_package_invalid"
def test_descriptor_fidelity_gate_rejects_large_optimized_delta() -> None:
# Arrange
retriever = LocalVprRetriever()
# Act
report = retriever.verify_descriptor_fidelity((1.0, 0.0), (0.0, 1.0), max_l2_delta=0.1)
# Assert
assert report.accepted is False
assert report.observed_l2_delta > report.max_l2_delta
-137
View File
@@ -1,137 +0,0 @@
from importlib import import_module
from pathlib import Path
COMPONENT_PACKAGES = [
"camera_ingest_calibration",
"vio_adapter",
"safety_anchor_wrapper",
"satellite_service",
"anchor_verification",
"tile_manager",
"mavlink_gcs_integration",
"fdr_observability",
]
SHARED_PACKAGES = [
"shared.contracts",
"shared.geo_geometry",
"shared.time_sync",
"shared.config",
"shared.errors",
"shared.telemetry",
]
REQUIRED_PATHS = [
"src",
"migrations/postgresql/0001_enable_postgis.sql",
"migrations/seed/README.md",
"tests/unit/test_scaffold.py",
"tests/unit/shared/.gitkeep",
"tests/unit/camera_ingest_calibration/.gitkeep",
"tests/unit/vio_adapter/.gitkeep",
"tests/unit/safety_anchor_wrapper/.gitkeep",
"tests/unit/satellite_service/.gitkeep",
"tests/unit/anchor_verification/.gitkeep",
"tests/unit/tile_manager/.gitkeep",
"tests/unit/mavlink_gcs_integration/.gitkeep",
"tests/unit/fdr_observability/.gitkeep",
"src/vio_adapter/native/README.md",
"src/satellite_service/native/README.md",
"src/anchor_verification/native/README.md",
"tests/integration/contracts/.gitkeep",
"tests/blackbox/still_image_geolocation/.gitkeep",
"tests/fixtures/project_60_images/.gitkeep",
"tests/sitl/plane_gps_input/.gitkeep",
"tests/e2e/replay/.gitkeep",
"e2e/replay/run_replay.py",
"e2e/reports/.gitkeep",
"deployment/docker/Dockerfile.runtime",
"deployment/docker/Dockerfile.replay",
"deployment/scripts/collect_evidence.sh",
"config/development/runtime.env",
"config/ci/runtime.env",
"config/jetson/runtime.env",
"config/production/runtime.env.example",
"docker-compose.yml",
"docker-compose.test.yml",
".github/workflows/ci.yml",
".env.example",
".dockerignore",
]
def test_runtime_component_public_modules_are_importable() -> None:
# Act
imported_modules = [
import_module(module_name)
for package_name in COMPONENT_PACKAGES
for module_name in (package_name, f"{package_name}.interfaces", f"{package_name}.types")
]
# Assert
assert len(imported_modules) == len(COMPONENT_PACKAGES) * 3
def test_shared_contract_locations_are_importable() -> None:
# Act
imported_modules = [import_module(package_name) for package_name in SHARED_PACKAGES]
# Assert
assert len(imported_modules) == len(SHARED_PACKAGES)
def test_generated_runtime_data_paths_are_gitkeep_only() -> None:
# Arrange
data_dirs = ["input", "expected", "cache", "fdr", "test-results"]
# Act
missing = [
directory for directory in data_dirs if not Path("data", directory, ".gitkeep").is_file()
]
# Assert
assert missing == []
def test_scaffold_paths_cover_runtime_test_and_evidence_layout() -> None:
# Act
missing = [path for path in REQUIRED_PATHS if not Path(path).exists()]
# Assert
assert missing == []
def test_native_bridge_placeholders_are_component_owned() -> None:
# Act
shared_native_path_exists = Path("src/native").exists()
# Assert
assert shared_native_path_exists is False
def test_ignore_rules_exclude_runtime_payloads_and_secrets() -> None:
# Arrange
required_patterns = [
".env",
"*.pem",
"*.key",
"data/input/*",
"data/cache/*",
"data/fdr/*",
"data/test-results/*",
"*.tlog",
"*.cbor",
"*.mp4",
]
# Act
gitignore = Path(".gitignore").read_text(encoding="utf-8")
dockerignore = Path(".dockerignore").read_text(encoding="utf-8")
missing_from_gitignore = [pattern for pattern in required_patterns if pattern not in gitignore]
missing_from_dockerignore = [
pattern for pattern in required_patterns if pattern not in dockerignore
]
# Assert
assert missing_from_gitignore == []
assert missing_from_dockerignore == []
-137
View File
@@ -1,137 +0,0 @@
from datetime import datetime, timezone
from tile_manager import LocalTileManager, TileGenerationRequest, TileManifestEntry
NOW = datetime(2026, 5, 3, tzinfo=timezone.utc)
def _entry(**overrides: object) -> TileManifestEntry:
payload: dict[str, object] = {
"tile_id": "tile-1",
"chunk_id": "chunk-1",
"crs": "EPSG:3857",
"meters_per_pixel": 0.3,
"capture_date": "2026-05-01",
"expires_at": "2026-06-01T00:00:00+00:00",
"content_hash": "sha256:tile",
"expected_content_hash": "sha256:tile",
"sidecar_hash": "sha256:sidecar",
"expected_sidecar_hash": "sha256:sidecar",
"signature_hash": "sig:trusted",
"provenance": "suite-satellite-service",
"footprint": {"min_lat": 49.0, "max_lat": 50.0},
"descriptor_ref": "descriptors/chunk-1.vlad",
}
payload.update(overrides)
return TileManifestEntry.model_validate(payload)
def test_valid_cache_manifest_activates_trusted_records() -> None:
# Arrange
manager = LocalTileManager(trusted_signature_hashes={"sig:trusted"}, now=NOW)
# Act
report = manager.validate_cache([_entry()])
# Assert
assert report.activated is True
assert report.decisions[0].accepted is True
assert report.trusted_records[0].trust_level == "trusted"
def test_tampered_or_stale_tile_is_rejected_with_auditable_reason() -> None:
# Arrange
manager = LocalTileManager(trusted_signature_hashes={"sig:trusted"}, now=NOW)
tampered = _entry(tile_id="tile-tampered", content_hash="sha256:bad")
stale = _entry(
tile_id="tile-stale",
chunk_id="chunk-stale",
expires_at="2026-05-01T00:00:00+00:00",
)
# Act
report = manager.validate_cache([tampered, stale])
# Assert
assert report.activated is False
assert [decision.reason for decision in report.decisions] == [
"content_hash_mismatch",
"stale",
]
def test_tile_metadata_lookup_returns_record_or_explicit_rejection() -> None:
# Arrange
manager = LocalTileManager(trusted_signature_hashes={"sig:trusted"}, now=NOW)
manager.validate_cache([_entry()])
# Act
found = manager.get_tile_metadata("chunk-1")
missing = manager.get_tile_metadata("missing")
# Assert
assert found.found is True
assert found.record is not None
assert found.descriptor_ref == "descriptors/chunk-1.vlad"
assert missing.found is False
assert missing.error is not None
assert missing.error.category == "validation"
def _generation_request(**overrides: object) -> TileGenerationRequest:
payload: dict[str, object] = {
"mission_id": "mission-1",
"frame_id": "frame-1",
"image_ref": "replay/frame-1.jpg",
"timestamp_ns": 10_000,
"parent_covariance_m": 2.5,
"frame_usable": True,
"quality_score": 0.8,
"footprint": {"min_lat": 49.0, "max_lat": 49.1},
"source_provenance": "nav-camera-generated",
}
payload.update(overrides)
return TileGenerationRequest.model_validate(payload)
def test_eligible_frame_stages_generated_cog_and_sidecar() -> None:
# Arrange
manager = LocalTileManager(trusted_signature_hashes={"sig:trusted"}, now=NOW)
# Act
candidate = manager.orthorectify_frame(_generation_request())
# Assert
assert candidate.accepted is True
assert candidate.cog_ref == "generated/mission-1/generated-mission-1-frame-1.cog.tif"
assert candidate.sidecar is not None
assert candidate.sidecar.trust_level == "generated"
assert candidate.sidecar.parent_covariance_m == 2.5
def test_high_covariance_generated_tile_write_is_rejected() -> None:
# Arrange
manager = LocalTileManager(trusted_signature_hashes={"sig:trusted"}, now=NOW)
# Act
candidate = manager.orthorectify_frame(_generation_request(parent_covariance_m=7.5))
# Assert
assert candidate.accepted is False
assert candidate.rejection_reason == "covariance_too_high"
assert manager.package_sync("mission-1").sidecars == ()
def test_sync_package_includes_manifest_delta_sidecar_covariance_and_trust_level() -> None:
# Arrange
manager = LocalTileManager(trusted_signature_hashes={"sig:trusted"}, now=NOW)
manager.orthorectify_frame(_generation_request())
# Act
package = manager.package_sync("mission-1")
# Assert
assert package.package_ref == "generated/mission-1/sync-package.json"
assert package.sidecars[0].parent_covariance_m == 2.5
assert package.manifest_delta[0]["trust_level"] == "generated"
assert package.manifest_delta[0]["parent_covariance_m"] == 2.5
-237
View File
@@ -1,237 +0,0 @@
import pytest
from pydantic import ValidationError
from shared.contracts import FramePacket, TelemetrySample
from vio_adapter import (
LocalVioAdapter,
NativeVioBackend,
VioBackendEstimate,
VioInputPacket,
VioRuntimeConfig,
create_vio_adapter,
)
class RecordingNativeRunner:
def __init__(self) -> None:
self.initialized = False
self.estimate_calls = 0
def initialize(self) -> None:
self.initialized = True
def estimate(
self,
frame: FramePacket,
telemetry_window: tuple[TelemetrySample, ...],
) -> VioBackendEstimate:
self.estimate_calls += 1
return VioBackendEstimate(
timestamp_ns=frame.timestamp_ns,
relative_pose={"x_m": 12.0, "y_m": -1.5, "z_m": 0.2, "yaw_rad": 0.3},
velocity_mps=(4.0, 0.5, 0.0),
tracking_quality=0.77,
bias_estimate={"sample_count": float(len(telemetry_window)), "gyro_bias": 0.01},
covariance_hint=[[0.4, 0.0, 0.0], [0.0, 0.4, 0.0], [0.0, 0.0, 0.8]],
)
class FailingNativeRunner:
def __init__(self, fail_on: str) -> None:
self._fail_on = fail_on
def initialize(self) -> None:
if self._fail_on == "initialize":
raise RuntimeError("engine package missing")
def estimate(
self,
frame: FramePacket,
telemetry_window: tuple[TelemetrySample, ...],
) -> VioBackendEstimate:
if self._fail_on == "estimate":
raise RuntimeError("engine lost tracking")
return VioBackendEstimate(
timestamp_ns=frame.timestamp_ns,
relative_pose={"x_m": 0.0},
velocity_mps=(0.0, 0.0, 0.0),
tracking_quality=1.0,
)
def _frame(**overrides: object) -> FramePacket:
payload: dict[str, object] = {
"frame_id": "frame-1",
"timestamp_ns": 1_000_000,
"image_ref": "replay/frame-1.jpg",
"calibration_id": "calib-1",
"occlusion": "clear",
"quality": 0.85,
}
payload.update(overrides)
return FramePacket.model_validate(payload)
def _telemetry(timestamp_ns: int = 1_000_000) -> TelemetrySample:
return TelemetrySample(
timestamp_ns=timestamp_ns,
imu={"accel_x": 0.1, "accel_y": 0.0, "accel_z": 9.8},
attitude={"roll": 0.0, "pitch": 0.01, "yaw": 0.02},
altitude_m=120.0,
airspeed_mps=24.0,
gps_health="lost",
)
def test_valid_synchronized_packet_emits_vio_state() -> None:
# Arrange
adapter = LocalVioAdapter()
packet = VioInputPacket(frame=_frame(), telemetry_samples=(_telemetry(),))
# Act
result = adapter.process(packet)
# Assert
assert result.error is None
assert result.state_packet is not None
assert result.state_packet.timestamp_ns == 1_000_000
assert result.state_packet.tracking_quality == 0.85
assert result.health.state == "ready"
def test_configured_native_backend_path_emits_vio_state() -> None:
# Arrange
runner = RecordingNativeRunner()
adapter = LocalVioAdapter(backend=NativeVioBackend(runner, backend_name="basalt"))
packet = VioInputPacket(frame=_frame(), telemetry_samples=(_telemetry(),))
# Act
result = adapter.process(packet)
# Assert
assert runner.initialized is True
assert runner.estimate_calls == 1
assert result.error is None
assert result.state_packet is not None
assert result.state_packet.relative_pose["x_m"] == 12.0
assert result.state_packet.velocity_mps == (4.0, 0.5, 0.0)
assert result.health.backend_name == "basalt"
assert result.processing_latency_ms is not None
def test_production_profile_selects_native_runtime_path() -> None:
# Arrange
runner = RecordingNativeRunner()
adapter = create_vio_adapter(
VioRuntimeConfig(environment="production"),
native_runner_factory=lambda: runner,
)
packet = VioInputPacket(frame=_frame(), telemetry_samples=(_telemetry(),))
# Act
result = adapter.process(packet)
# Assert
assert runner.initialized is True
assert runner.estimate_calls == 1
assert result.error is None
assert result.state_packet is not None
assert result.health.backend_name == "basalt"
def test_production_profile_without_installed_native_runtime_fails_closed() -> None:
# Arrange
adapter = create_vio_adapter(VioRuntimeConfig(environment="production"))
packet = VioInputPacket(frame=_frame(), telemetry_samples=(_telemetry(),))
# Act
result = adapter.process(packet)
# Assert
assert result.state_packet is None
assert result.health.state == "failed"
assert result.error is not None
assert result.error.cause == "backend_initialization_failed"
assert "unable to load BASALT runtime" in result.error.message
def test_replay_mode_is_explicit_and_not_valid_for_production() -> None:
# Arrange
replay_config = VioRuntimeConfig(environment="development", mode="replay")
adapter = create_vio_adapter(replay_config)
packet = VioInputPacket(frame=_frame(), telemetry_samples=(_telemetry(),))
# Act
result = adapter.process(packet)
# Assert
assert result.error is None
assert result.state_packet is not None
assert result.health.backend_name == "replay_vio"
with pytest.raises(ValidationError, match="require native runtime mode"):
VioRuntimeConfig(environment="production", mode="replay")
def test_native_backend_initialization_failure_sets_failed_health() -> None:
# Arrange
adapter = LocalVioAdapter(
backend=NativeVioBackend(FailingNativeRunner("initialize"), backend_name="basalt")
)
packet = VioInputPacket(frame=_frame(), telemetry_samples=(_telemetry(),))
# Act
result = adapter.process(packet)
# Assert
assert result.state_packet is None
assert result.health.state == "failed"
assert result.error is not None
assert result.error.cause == "backend_initialization_failed"
def test_native_backend_runtime_failure_sets_failed_health() -> None:
# Arrange
adapter = LocalVioAdapter(
backend=NativeVioBackend(FailingNativeRunner("estimate"), backend_name="basalt")
)
packet = VioInputPacket(frame=_frame(), telemetry_samples=(_telemetry(),))
# Act
result = adapter.process(packet)
# Assert
assert result.state_packet is None
assert result.health.state == "failed"
assert result.error is not None
assert result.error.cause == "backend_runtime_failed"
def test_timestamp_mismatch_is_explicit_validation_error() -> None:
# Arrange
adapter = LocalVioAdapter(timestamp_tolerance_ns=1_000)
packet = VioInputPacket(frame=_frame(), telemetry_samples=(_telemetry(2_000_000),))
# Act
result = adapter.process(packet)
# Assert
assert result.state_packet is None
assert result.error is not None
assert result.error.component == "vio_adapter"
assert result.error.cause == "gap_exceeded"
assert result.health.state == "degraded"
def test_tracking_loss_degrades_health_without_emitting_absolute_position() -> None:
# Arrange
adapter = LocalVioAdapter(degraded_quality_threshold=0.35)
packet = VioInputPacket(frame=_frame(quality=0.2), telemetry_samples=(_telemetry(),))
# Act
result = adapter.process(packet)
# Assert
assert result.state_packet is not None
assert result.health.state == "degraded"
assert "latitude_deg" not in result.state_packet.model_dump()
assert "longitude_deg" not in result.state_packet.model_dump()
-1
View File
@@ -1 +0,0 @@
-1
View File
@@ -1 +0,0 @@