[AZ-423] [AZ-427] Add FT-P-19 + FT-N-05 blackbox tests

Implement the AC-8.6 (top-K=10 retrieval scale-ratio + scene-change
PARTIAL) and AC-8.2 / AC-NEW-6 (stale aged-tile rejection) blackbox
scenarios.

AZ-423 (FT-P-19, 3pt) helpers + scenario:
- retrieval_evaluator.py — top-K within-distance evaluator (60 stills
  vs 100 m budget), scene-change PARTIAL recorder (always emits
  PARTIAL on the 2 _gmaps.png pairs), FDR record projectors, CSV
  writers.
- tests/positive/test_ft_p_19_sat_reloc_scale.py (6 parametrised
  variants).

AZ-427 (FT-N-05, 2pt) helpers + scenario:
- aged_tile_rejection_evaluator.py — Signal A (stale rejection at
  load) + Signal B (per-frame downgrade) decision matrix, reuses
  ALLOWED_SOURCE_LABELS from estimate_schema.
- tests/negative/test_ft_n_05_stale_tile_rejection.py (12 parametrised
  variants: FC × VIO × {7mo/active-conflict, 13mo/rear}).

48 new unit tests cover every helper branch. Both scenarios skip
when sitl_replay_ready is false and fail loudly when fixture records
are missing.

Per-batch review: PASS_WITH_WARNINGS (2 Low — production-dependency
surface, FDR-kind constant duplication).
Cumulative review 82-84: PASS (2 Low carry-over / hygiene candidate).

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-17 15:43:06 +03:00
parent a22028087f
commit f25cae4a82
13 changed files with 2005 additions and 3 deletions
@@ -0,0 +1,287 @@
"""Unit tests for ``runner.helpers.aged_tile_rejection_evaluator`` (AZ-427).
Pure-logic AC-8.2 / AC-NEW-6 coverage for FT-N-05.
"""
from __future__ import annotations
from typing import Any
import pytest
from runner.helpers import aged_tile_rejection_evaluator as ate
def _emission(frame_id: str = "AD000001.jpg", label: str = "satellite_anchored") -> ate.SourceLabelEmission:
return ate.SourceLabelEmission(frame_id=frame_id, label=label)
def _rejection(tile_id: str = "tile_001", reason: str = "stale") -> ate.StaleTileRejection:
return ate.StaleTileRejection(tile_id=tile_id, reason=reason)
# ─────────────────────── Signal A (load-time rejection) ───────────────────────
def test_signal_a_stale_rejected_no_emissions_passes() -> None:
# Arrange — SUT rejected the aged tiles at load; nothing else ran
report = ate.evaluate_aged_tile_rejection(
sub_case_id="7mo_active_conflict",
fixture="synth-age-7mo",
sector="active_conflict",
emissions=[],
rejections=[_rejection()],
)
# Assert
assert report.passes
assert report.signal_a_holds
assert not report.signal_b_holds
assert report.has_stale_rejection
assert report.anchored_count == 0
def test_signal_a_stale_rejected_plus_downgrade_passes() -> None:
# Arrange — rejected at load AND every later emission is downgraded
emissions = [_emission(label="visual_propagated"), _emission(label="dead_reckoned")]
# Act
report = ate.evaluate_aged_tile_rejection(
sub_case_id="13mo_rear",
fixture="synth-age-13mo",
sector="rear",
emissions=emissions,
rejections=[_rejection()],
)
# Assert
assert report.passes
assert report.signal_a_holds
def test_signal_a_stale_rejected_but_one_anchored_fails() -> None:
# Arrange — even with stale rejection, an anchored emission is a fatal fail
emissions = [_emission(label="satellite_anchored"), _emission(label="visual_propagated")]
# Act
report = ate.evaluate_aged_tile_rejection(
sub_case_id="x",
fixture="synth-age-7mo",
sector="active_conflict",
emissions=emissions,
rejections=[_rejection()],
)
# Assert
assert not report.passes
assert report.anchored_count == 1
assert not report.signal_a_holds # short-circuit due to anchored emission
# ─────────────────────── Signal B (per-frame downgrade) ───────────────────────
def test_signal_b_all_downgraded_no_rejection_passes() -> None:
# Arrange — tiles loaded, every emission downgraded
emissions = [_emission(f"f_{i}", label="visual_propagated") for i in range(60)]
# Act
report = ate.evaluate_aged_tile_rejection(
sub_case_id="x",
fixture="synth-age-7mo",
sector="active_conflict",
emissions=emissions,
rejections=[],
)
# Assert
assert report.passes
assert not report.signal_a_holds
assert report.signal_b_holds
def test_signal_b_mixed_downgrade_labels_pass() -> None:
# Arrange — visual_propagated + dead_reckoned mix
emissions = [
_emission("f_1", label="visual_propagated"),
_emission("f_2", label="dead_reckoned"),
_emission("f_3", label="visual_propagated"),
]
# Act
report = ate.evaluate_aged_tile_rejection(
sub_case_id="x",
fixture="synth-age-7mo",
sector="active_conflict",
emissions=emissions,
rejections=[],
)
# Assert
assert report.passes
def test_signal_b_one_anchored_among_downgrades_fails() -> None:
# Arrange — 59 downgraded + 1 anchored = FAIL
emissions = [_emission(f"f_{i}", label="visual_propagated") for i in range(59)]
emissions.append(_emission("f_60", label="satellite_anchored"))
# Act
report = ate.evaluate_aged_tile_rejection(
sub_case_id="x",
fixture="synth-age-7mo",
sector="active_conflict",
emissions=emissions,
rejections=[],
)
# Assert
assert not report.passes
assert report.anchored_count == 1
# ─────────────────────── degenerate inputs ───────────────────────
def test_no_emissions_no_rejection_fails() -> None:
# Arrange — we can't conclude the freshness gate fired at all
# Act
report = ate.evaluate_aged_tile_rejection(
sub_case_id="x",
fixture="synth-age-7mo",
sector="active_conflict",
emissions=[],
rejections=[],
)
# Assert
assert not report.passes
assert not report.signal_a_holds
assert not report.signal_b_holds
def test_illegal_label_fails_regardless_of_signals() -> None:
# Arrange — a label outside the FT-P-03 set is a hard fail
emissions = [_emission(label="garbage")]
# Act
report = ate.evaluate_aged_tile_rejection(
sub_case_id="x",
fixture="synth-age-7mo",
sector="active_conflict",
emissions=emissions,
rejections=[_rejection()],
)
# Assert
assert not report.passes
assert report.illegal_labels == (("AD000001.jpg", "garbage"),)
def test_non_stale_rejection_ignored() -> None:
# Arrange — reason != "stale" must not satisfy signal A
report = ate.evaluate_aged_tile_rejection(
sub_case_id="x",
fixture="synth-age-7mo",
sector="active_conflict",
emissions=[],
rejections=[_rejection(reason="below_floor")],
)
# Assert
assert not report.has_stale_rejection
assert not report.passes
def test_multiple_stale_rejections_collected() -> None:
# Arrange
rejections = [_rejection("t_a"), _rejection("t_b")]
# Act
report = ate.evaluate_aged_tile_rejection(
sub_case_id="x",
fixture="synth-age-7mo",
sector="active_conflict",
emissions=[],
rejections=rejections,
)
# Assert
assert report.stale_rejections == ("t_a", "t_b")
def test_report_anchored_frame_ids_preserved() -> None:
# Arrange — anchored emissions must surface their frame IDs for diagnosis
emissions = [
_emission("AD000010.jpg", label="satellite_anchored"),
_emission("AD000020.jpg", label="satellite_anchored"),
]
# Act
report = ate.evaluate_aged_tile_rejection(
sub_case_id="x",
fixture="synth-age-13mo",
sector="rear",
emissions=emissions,
rejections=[],
)
# Assert
assert report.anchored_frame_ids == ("AD000010.jpg", "AD000020.jpg")
# ─────────────────────── project_stale_rejection_payload ───────────────────────
def test_project_stale_rejection_happy_path() -> None:
# Act
r = ate.project_stale_rejection_payload({"tile_id": "t_a", "reason": "stale"})
# Assert
assert r is not None
assert r.tile_id == "t_a"
def test_project_stale_rejection_id_alias_accepted() -> None:
# Act
r = ate.project_stale_rejection_payload({"id": "t_b", "reason": "stale"})
# Assert
assert r is not None
assert r.tile_id == "t_b"
def test_project_stale_rejection_wrong_reason_returns_none() -> None:
# Act
r = ate.project_stale_rejection_payload({"tile_id": "t_a", "reason": "below_floor"})
# Assert
assert r is None
def test_project_stale_rejection_non_dict_returns_none() -> None:
# Act / Assert
assert ate.project_stale_rejection_payload(None) is None
assert ate.project_stale_rejection_payload("nope") is None
def test_project_stale_rejection_missing_id_returns_none() -> None:
# Act / Assert
assert ate.project_stale_rejection_payload({"reason": "stale"}) is None
def test_project_stale_rejection_id_not_string_returns_none() -> None:
# Act / Assert
assert ate.project_stale_rejection_payload({"tile_id": 42, "reason": "stale"}) is None
# ─────────────────────── iter_stale_rejection_payloads ───────────────────────
class _StubRecord:
def __init__(self, record_type: str, payload: Any) -> None:
self.record_type = record_type
self.payload = payload
def test_iter_stale_rejection_payloads_filters_record_type() -> None:
# Arrange
records = [
_StubRecord("tile-load-rejected", {"tile_id": "t_a", "reason": "stale"}),
_StubRecord("retrieval-topk", {"image_id": "AD000001"}),
_StubRecord("tile-load-rejected", {"tile_id": "t_b", "reason": "below_floor"}),
_StubRecord("tile-load-rejected", {"tile_id": "t_c", "reason": "stale"}),
]
# Act
rejections = list(ate.iter_stale_rejection_payloads(records))
# Assert
assert [r.tile_id for r in rejections] == ["t_a", "t_c"]
# ─────────────────────── AGED_FIXTURE_SECTOR_BINDINGS ───────────────────────
def test_aged_fixture_sector_bindings_match_spec() -> None:
# Assert — frozen contract from FT-N-05 AC-8.2 sub-case table
assert dict(ate.AGED_FIXTURE_SECTOR_BINDINGS) == {
"synth-age-7mo": ate.SECTOR_ACTIVE_CONFLICT,
"synth-age-13mo": ate.SECTOR_REAR,
}
@@ -0,0 +1,389 @@
"""Unit tests for ``runner.helpers.retrieval_evaluator`` (AZ-423).
Pure-logic AC-8.6 coverage for FT-P-19 — the scenario in
``e2e/tests/positive/test_ft_p_19_sat_reloc_scale.py`` exercises the
same helpers end-to-end when the SITL fixture is prepared; this file
covers them in isolation.
"""
from __future__ import annotations
import csv
from pathlib import Path
from typing import Any
import pytest
from runner.helpers import retrieval_evaluator as re_
DERKACHI_LAT = 48.275292
DERKACHI_LON = 37.385220
def _candidate(
tile_id: str = "tile_001",
lat: float = DERKACHI_LAT,
lon: float = DERKACHI_LON,
) -> re_.CandidateTile:
return re_.CandidateTile(tile_id=tile_id, centre_lat_deg=lat, centre_lon_deg=lon)
def _query(
image_id: str = "AD000001",
*,
true_lat: float = DERKACHI_LAT,
true_lon: float = DERKACHI_LON,
candidates: tuple[re_.CandidateTile, ...] = (),
) -> re_.TopKQuery:
return re_.TopKQuery(
image_id=image_id,
true_centre_lat_deg=true_lat,
true_centre_lon_deg=true_lon,
candidates=candidates,
)
# ─────────────────────── evaluate_top_k_within_distance ───────────────────────
def test_evaluate_top_k_one_candidate_close_passes() -> None:
# Arrange — single candidate exactly at GT
q = _query(candidates=(_candidate(),))
# Act
report = re_.evaluate_top_k_within_distance([q], expected_image_count=1)
# Assert
assert report.passes
assert report.pass_count == 1
assert report.entries[0].min_distance_m == pytest.approx(0.0, abs=0.01)
def test_evaluate_top_k_all_candidates_far_fails() -> None:
# Arrange — candidate ~ 1 km east at this latitude
far = _candidate(tile_id="far", lat=DERKACHI_LAT, lon=DERKACHI_LON + 0.0135)
q = _query(candidates=(far,))
# Act
report = re_.evaluate_top_k_within_distance([q], expected_image_count=1)
# Assert
assert not report.passes
assert report.entries[0].pass_distance is False
assert (report.entries[0].min_distance_m or 0) > 100.0
def test_evaluate_top_k_one_close_candidate_among_far_passes() -> None:
# Arrange — 9 far + 1 close in top-K (any rank passes)
far_tiles = tuple(
_candidate(tile_id=f"far_{i}", lon=DERKACHI_LON + 0.01 * (i + 1))
for i in range(9)
)
close = _candidate(tile_id="close", lat=DERKACHI_LAT, lon=DERKACHI_LON)
q = _query(candidates=far_tiles + (close,))
# Act
report = re_.evaluate_top_k_within_distance([q], expected_image_count=1)
# Assert
assert report.passes
def test_evaluate_top_k_empty_candidates_fails() -> None:
# Arrange
q = _query(candidates=())
# Act
report = re_.evaluate_top_k_within_distance([q], expected_image_count=1)
# Assert
assert not report.passes
assert report.entries[0].min_distance_m is None
assert report.entries[0].candidate_count == 0
def test_evaluate_top_k_short_query_count_fails_aggregate() -> None:
# Arrange — 1 passing query but expected_image_count=60
q = _query(candidates=(_candidate(),))
# Act
report = re_.evaluate_top_k_within_distance([q], expected_image_count=60)
# Assert
assert not report.passes
assert report.pass_count == 1
def test_evaluate_top_k_invalid_tolerance_raises() -> None:
with pytest.raises(ValueError, match="max_distance_m"):
re_.evaluate_top_k_within_distance(
[_query(candidates=(_candidate(),))], max_distance_m=0
)
def test_evaluate_top_k_custom_tolerance() -> None:
# Arrange — candidate 200m east; default 100m fails, custom 250m passes
far_200m = _candidate(lat=DERKACHI_LAT, lon=DERKACHI_LON + 0.0027)
q = _query(candidates=(far_200m,))
# Act
strict = re_.evaluate_top_k_within_distance([q], max_distance_m=100, expected_image_count=1)
lenient = re_.evaluate_top_k_within_distance([q], max_distance_m=250, expected_image_count=1)
# Assert
assert not strict.passes
assert lenient.passes
def test_evaluate_top_k_aggregate_60_all_pass() -> None:
# Arrange — 60 queries, each with one close candidate
queries = [_query(image_id=f"AD0000{i:02d}", candidates=(_candidate(),)) for i in range(1, 61)]
# Act
report = re_.evaluate_top_k_within_distance(queries, expected_image_count=60)
# Assert
assert report.passes
assert report.pass_count == 60
def test_evaluate_top_k_aggregate_60_one_fail() -> None:
# Arrange — 59 pass + 1 fail
queries = [_query(image_id=f"AD0000{i:02d}", candidates=(_candidate(),)) for i in range(1, 60)]
queries.append(_query(image_id="AD000060", candidates=()))
# Act
report = re_.evaluate_top_k_within_distance(queries, expected_image_count=60)
# Assert
assert not report.passes
assert report.pass_count == 59
assert len(report.failing_entries) == 1
# ─────────────────────── evaluate_scene_change_subset ───────────────────────
def test_evaluate_scene_change_both_matched_still_partial() -> None:
# Arrange — both pairs matched
matches = [
re_.SceneChangeMatch(image_id="AD000001", matched=True, inlier_count=120),
re_.SceneChangeMatch(image_id="AD000002", matched=True, inlier_count=98),
]
# Act
report = re_.evaluate_scene_change_subset(matches)
# Assert
assert report.coverage_complete
assert report.overall_label == re_.SCENE_CHANGE_SUBSET_PARTIAL_LABEL
assert report.matched_count == 2
def test_evaluate_scene_change_zero_matched_still_partial() -> None:
# Arrange
matches = [
re_.SceneChangeMatch(image_id="AD000001", matched=False, inlier_count=0),
re_.SceneChangeMatch(image_id="AD000002", matched=False, inlier_count=0),
]
# Act
report = re_.evaluate_scene_change_subset(matches)
# Assert
assert report.overall_label == re_.SCENE_CHANGE_SUBSET_PARTIAL_LABEL
def test_evaluate_scene_change_one_image_only_coverage_incomplete() -> None:
# Arrange
matches = [re_.SceneChangeMatch(image_id="AD000001", matched=True, inlier_count=120)]
# Act
report = re_.evaluate_scene_change_subset(matches)
# Assert
assert not report.coverage_complete
# PARTIAL label still set (decoupled from coverage)
assert report.overall_label == re_.SCENE_CHANGE_SUBSET_PARTIAL_LABEL
def test_evaluate_scene_change_empty_coverage_incomplete() -> None:
# Act
report = re_.evaluate_scene_change_subset([])
# Assert
assert not report.coverage_complete
def test_evaluate_scene_change_extra_image_ids_coverage_incomplete() -> None:
# Arrange — image set that doesn't match expected pair ids
matches = [
re_.SceneChangeMatch(image_id="AD000099", matched=True, inlier_count=120),
re_.SceneChangeMatch(image_id="AD000002", matched=True, inlier_count=80),
]
# Act
report = re_.evaluate_scene_change_subset(matches)
# Assert
assert not report.coverage_complete
# ─────────────────────── CSV writers ───────────────────────
def test_write_top_k_csv_round_trip(tmp_path: Path) -> None:
# Arrange
out = tmp_path / "topk.csv"
queries = [
_query(image_id="AD000001", candidates=(_candidate(),)),
_query(image_id="AD000002", candidates=()),
]
report = re_.evaluate_top_k_within_distance(queries, expected_image_count=2)
# Act
re_.write_top_k_csv(out, report)
rows = list(csv.reader(out.open(encoding="utf-8")))
# Assert
assert rows[0] == list(re_.TOP_K_CSV_HEADER)
assert rows[1][0] == "AD000001"
assert rows[1][3] == "true"
assert rows[2][0] == "AD000002"
assert rows[2][2] == "" # min_distance_m is None when no candidates
assert rows[2][3] == "false"
def test_write_scene_change_csv_round_trip(tmp_path: Path) -> None:
# Arrange
out = tmp_path / "scene_change.csv"
matches = [
re_.SceneChangeMatch(image_id="AD000001", matched=True, inlier_count=120),
re_.SceneChangeMatch(image_id="AD000002", matched=False, inlier_count=None),
]
report = re_.evaluate_scene_change_subset(matches)
# Act
re_.write_scene_change_csv(out, report)
rows = list(csv.reader(out.open(encoding="utf-8")))
# Assert
assert rows[0] == list(re_.SCENE_CHANGE_CSV_HEADER)
assert rows[1] == ["AD000001", "true", "120", re_.SCENE_CHANGE_SUBSET_PARTIAL_LABEL]
assert rows[2] == ["AD000002", "false", "", re_.SCENE_CHANGE_SUBSET_PARTIAL_LABEL]
def test_write_top_k_csv_missing_parent_dir_raises(tmp_path: Path) -> None:
out = tmp_path / "nope" / "topk.csv"
report = re_.evaluate_top_k_within_distance(
[_query(candidates=(_candidate(),))], expected_image_count=1
)
with pytest.raises(OSError):
re_.write_top_k_csv(out, report)
# ─────────────────────── project_topk_record_to_query ───────────────────────
def test_project_topk_happy_path() -> None:
# Arrange
payload = {
"image_id": "AD000001",
"candidates": [
{"tile_id": "t_a", "centre_lat_deg": DERKACHI_LAT, "centre_lon_deg": DERKACHI_LON},
{"tile_id": "t_b", "centre_lat_deg": DERKACHI_LAT + 0.001, "centre_lon_deg": DERKACHI_LON},
],
}
# Act
q = re_.project_topk_record_to_query(
payload, true_centre_lat_deg=DERKACHI_LAT, true_centre_lon_deg=DERKACHI_LON
)
# Assert
assert q is not None
assert q.image_id == "AD000001"
assert len(q.candidates) == 2
assert q.candidates[0].tile_id == "t_a"
def test_project_topk_skips_malformed_candidates() -> None:
# Arrange
payload = {
"image_id": "AD000002",
"candidates": [
{"tile_id": "ok", "centre_lat_deg": DERKACHI_LAT, "centre_lon_deg": DERKACHI_LON},
"not a dict",
{"tile_id": "missing_lat", "centre_lon_deg": DERKACHI_LON},
{"tile_id": "wrong_type", "centre_lat_deg": "stringy", "centre_lon_deg": DERKACHI_LON},
],
}
# Act
q = re_.project_topk_record_to_query(
payload, true_centre_lat_deg=DERKACHI_LAT, true_centre_lon_deg=DERKACHI_LON
)
# Assert
assert q is not None
assert [c.tile_id for c in q.candidates] == ["ok"]
def test_project_topk_non_dict_payload_returns_none() -> None:
# Act / Assert
assert re_.project_topk_record_to_query("not a dict", 0, 0) is None # type: ignore[arg-type]
def test_project_topk_missing_image_id_returns_none() -> None:
# Act / Assert
assert re_.project_topk_record_to_query({"candidates": []}, 0, 0) is None
def test_project_topk_missing_candidates_returns_none() -> None:
# Act / Assert
assert re_.project_topk_record_to_query({"image_id": "AD000001"}, 0, 0) is None
# ─────────────────────── project_scene_change_record ───────────────────────
def test_project_scene_change_happy_path() -> None:
# Arrange
payload = {"image_id": "AD000001", "matched": True, "inlier_count": 120}
# Act
m = re_.project_scene_change_record(payload)
# Assert
assert m is not None
assert m.matched is True
assert m.inlier_count == 120
def test_project_scene_change_inlier_count_missing_is_none() -> None:
# Act
m = re_.project_scene_change_record({"image_id": "AD000001", "matched": False})
# Assert
assert m is not None
assert m.inlier_count is None
def test_project_scene_change_inlier_count_bool_is_none() -> None:
# Act — bool is technically int in Python, but treat as missing for inlier_count
m = re_.project_scene_change_record(
{"image_id": "AD000001", "matched": True, "inlier_count": True}
)
# Assert
assert m is not None
assert m.inlier_count is None
def test_project_scene_change_matched_not_bool_returns_none() -> None:
# Act / Assert
assert re_.project_scene_change_record({"image_id": "AD000001", "matched": "yes"}) is None
def test_project_scene_change_non_dict_returns_none() -> None:
# Act / Assert
assert re_.project_scene_change_record(None) is None
assert re_.project_scene_change_record("nope") is None
# ─────────────────────── iter_*_payloads ───────────────────────
class _StubRecord:
def __init__(self, record_type: str, payload: Any) -> None:
self.record_type = record_type
self.payload = payload
def test_iter_topk_payloads_filters_by_record_type() -> None:
# Arrange
records = [
_StubRecord("retrieval-topk", {"image_id": "AD000001"}),
_StubRecord("scene-change-match", {"image_id": "AD000002"}),
_StubRecord("retrieval-topk", {"image_id": "AD000003"}),
_StubRecord("other-kind", {}),
]
# Act
payloads = list(re_.iter_topk_payloads(records))
# Assert
assert [p["image_id"] for p in payloads] == ["AD000001", "AD000003"]
def test_iter_scene_change_payloads_filters_by_record_type() -> None:
# Arrange
records = [
_StubRecord("retrieval-topk", {"image_id": "AD000001"}),
_StubRecord("scene-change-match", {"image_id": "AD000002"}),
]
# Act
payloads = list(re_.iter_scene_change_payloads(records))
# Assert
assert payloads == [{"image_id": "AD000002"}]
+4
View File
@@ -55,6 +55,8 @@ E2E_ROOT = Path(__file__).resolve().parents[1]
"runner/helpers/tile_cache_inspector.py",
"runner/helpers/mid_flight_tile_evaluator.py",
"runner/helpers/mock_suite_sat_audit.py",
"runner/helpers/retrieval_evaluator.py",
"runner/helpers/aged_tile_rejection_evaluator.py",
"runner/helpers/cold_start_evaluator.py",
"runner/helpers/outlier_tolerance_evaluator.py",
"runner/helpers/outage_request_evaluator.py",
@@ -116,10 +118,12 @@ E2E_ROOT = Path(__file__).resolve().parents[1]
"tests/positive/test_ft_p_16_offline_only.py",
"tests/positive/test_ft_p_17_mid_flight_tiles.py",
"tests/positive/test_ft_p_18_no_raw_retention.py",
"tests/positive/test_ft_p_19_sat_reloc_scale.py",
"tests/negative/test_ft_n_01_outlier_tolerance.py",
"tests/negative/test_ft_n_02_sharp_turn_failure.py",
"tests/negative/test_ft_n_03_outage_reloc.py",
"tests/negative/test_ft_n_04_blackout_spoof.py",
"tests/negative/test_ft_n_05_stale_tile_rejection.py",
"tests/negative/test_ft_n_06_mid_flight_freshness.py",
],
)
@@ -0,0 +1,218 @@
"""Aged-tile rejection evaluator (AZ-427 / FT-N-05).
Pure-logic helpers for AC-8.2 / AC-NEW-6 negative scenario: when the
SUT mounts a tile cache whose manifest dates exceed the freshness
window for the configured sector, NO outbound emission may carry
``source_label = satellite_anchored``.
The AC accepts two distinct evidence shapes (per the task spec):
* **Signal A — load-time rejection**: an FDR ``tile-load-rejected``
record with ``reason == "stale"`` was emitted at startup for the
stale tile(s). Conceptually the SUT refused to load aged tiles at
all, so no later emission could be anchored on them.
* **Signal B — per-frame downgrade**: the SUT loaded the tiles but
the freshness gate downgrades every outbound emission to
``source_label ∈ {visual_propagated, dead_reckoned}``.
A run passes iff at least one signal holds AND no emission slipped
through with ``satellite_anchored``. A run fails the moment ANY frame
carries ``satellite_anchored`` regardless of which signal otherwise
held.
The scenario test pulls emissions from the SITL observer / FDR stream
and feeds a typed list to ``evaluate_aged_tile_rejection``; this
module decides whether the parsed inputs satisfy the AC.
Public-boundary discipline: NO imports from ``src/gps_denied_onboard``.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable, Sequence
from .estimate_schema import ALLOWED_SOURCE_LABELS
# ─────────────────────── FDR record kinds & constants ───────────────────────
TILE_LOAD_REJECTED_FDR_KIND = "tile-load-rejected"
TILE_LOAD_REJECTED_STALE_REASON = "stale"
SATELLITE_ANCHORED_LABEL = "satellite_anchored"
ACCEPTABLE_DOWNGRADE_LABELS: frozenset[str] = frozenset(
{"visual_propagated", "dead_reckoned"}
)
# Sector configuration values per AC-8.2 — surface them here so the
# scenario test can declare its sub-case configuration in the same
# vocabulary as the AC.
SECTOR_ACTIVE_CONFLICT = "active_conflict"
SECTOR_REAR = "rear"
# Sub-case bindings: synth-age fixture name → sector value the SUT must
# be configured for, per AC-8.2 of the AZ-427 spec. Used by the scenario
# test to declare its parameterisation in spec vocabulary.
AGED_FIXTURE_SECTOR_BINDINGS: tuple[tuple[str, str], ...] = (
("synth-age-7mo", SECTOR_ACTIVE_CONFLICT),
("synth-age-13mo", SECTOR_REAR),
)
@dataclass(frozen=True)
class SourceLabelEmission:
"""One outbound estimate's source-label observation.
``frame_id`` is whatever identifier the scenario test attaches to
the per-frame emission (e.g., still-image filename or per-frame
monotonic counter). It is opaque to this helper — only the label
and its membership in the allowed sets matter for AC evaluation.
``label`` is the SUT's emitted ``source_label`` value; the helper
validates it against ``ALLOWED_SOURCE_LABELS`` from
``estimate_schema``.
"""
frame_id: str
label: str
@dataclass(frozen=True)
class StaleTileRejection:
"""One FDR ``tile-load-rejected`` event with ``reason == "stale"``."""
tile_id: str
reason: str # always ``"stale"`` for rejections of interest
@dataclass(frozen=True)
class AgedTileRejectionReport:
"""AC-1 / AC-2 of FT-N-05: aged tiles must never produce satellite_anchored.
Decision matrix:
* ``has_stale_rejection`` True AND ``anchored_count`` == 0 → PASS via Signal A.
* ``has_stale_rejection`` False AND every frame in the downgrade set → PASS via Signal B.
* Any frame with ``label == "satellite_anchored"`` → FAIL, period.
* No emissions observed AND no stale rejection → FAIL (we can't
conclude the freshness gate fired at all).
* Any frame with a label outside ``ALLOWED_SOURCE_LABELS`` → FAIL
(this is itself a contract violation per FT-P-03 / AC-1.4 — we
surface it loudly via ``illegal_labels``).
"""
sub_case_id: str
fixture: str
sector: str
emissions_observed: int
anchored_frame_ids: tuple[str, ...]
downgrade_frame_count: int
illegal_labels: tuple[tuple[str, str], ...] # (frame_id, label)
stale_rejections: tuple[str, ...] # tile_ids
@property
def has_stale_rejection(self) -> bool:
return bool(self.stale_rejections)
@property
def anchored_count(self) -> int:
return len(self.anchored_frame_ids)
@property
def signal_a_holds(self) -> bool:
"""Signal A: SUT rejected the aged tiles at load time."""
return self.has_stale_rejection and self.anchored_count == 0
@property
def signal_b_holds(self) -> bool:
"""Signal B: SUT loaded but downgrades every emission.
Requires at least one observation AND every observed emission
falls in the downgrade set (and zero anchored emissions).
"""
return (
self.emissions_observed > 0
and self.downgrade_frame_count == self.emissions_observed
and self.anchored_count == 0
)
@property
def passes(self) -> bool:
if self.illegal_labels:
return False
if self.anchored_count > 0:
return False
return self.signal_a_holds or self.signal_b_holds
def evaluate_aged_tile_rejection(
sub_case_id: str,
fixture: str,
sector: str,
emissions: Sequence[SourceLabelEmission],
rejections: Iterable[StaleTileRejection],
) -> AgedTileRejectionReport:
"""AC-1 / AC-2: decide a single sub-case.
``sub_case_id`` is the human-readable label the test attaches to
this sub-case for traceability (e.g., "7mo_active_conflict").
"""
anchored: list[str] = []
downgrade: int = 0
illegal: list[tuple[str, str]] = []
for em in emissions:
if em.label not in ALLOWED_SOURCE_LABELS:
illegal.append((em.frame_id, em.label))
continue
if em.label == SATELLITE_ANCHORED_LABEL:
anchored.append(em.frame_id)
elif em.label in ACCEPTABLE_DOWNGRADE_LABELS:
downgrade += 1
stale_tile_ids = tuple(r.tile_id for r in rejections if r.reason == TILE_LOAD_REJECTED_STALE_REASON)
return AgedTileRejectionReport(
sub_case_id=sub_case_id,
fixture=fixture,
sector=sector,
emissions_observed=len(emissions),
anchored_frame_ids=tuple(anchored),
downgrade_frame_count=downgrade,
illegal_labels=tuple(illegal),
stale_rejections=stale_tile_ids,
)
# ─────────────────────── FDR record projection ───────────────────────
def project_stale_rejection_payload(payload: object) -> StaleTileRejection | None:
"""Project one FDR ``tile-load-rejected`` payload onto a ``StaleTileRejection``.
The payload is expected to carry:
* ``tile_id`` or ``id`` (str)
* ``reason`` (str) — only ``"stale"`` is relevant for this AC
Returns ``None`` when malformed OR when the reason is not stale —
AC-NEW-6 cares only about the stale-rejection subset.
"""
if not isinstance(payload, dict):
return None
reason = payload.get("reason")
if reason != TILE_LOAD_REJECTED_STALE_REASON:
return None
raw_id = payload.get("tile_id") or payload.get("id")
if not isinstance(raw_id, str):
return None
return StaleTileRejection(tile_id=raw_id, reason=reason)
def iter_stale_rejection_payloads(records: Iterable[object]) -> Iterable[StaleTileRejection]:
"""Filter records → yield well-formed stale ``StaleTileRejection`` events."""
for rec in records:
rt = getattr(rec, "record_type", None)
if rt != TILE_LOAD_REJECTED_FDR_KIND:
continue
rejection = project_stale_rejection_payload(getattr(rec, "payload", None))
if rejection is not None:
yield rejection
+380
View File
@@ -0,0 +1,380 @@
"""Top-K retrieval evaluator + scene-change PARTIAL recorder (AZ-423 / FT-P-19).
Two pure-logic helpers feeding AC-8.6 (AZ-423):
* **AC-1 — scale-ratio retrievability**: for each of the 60
``still-image-set-60`` images the SUT runs top-K=10 retrieval against
its tile cache. The AC passes iff EVERY image has at least one
retrieved tile whose centre lies within 100 m of the image's true
centre. This is a "set_contains" check — we do NOT care which rank
the matching tile occupies, only that it appears in the top-K.
* **AC-2 — scene-change subset PARTIAL**: for the 2 paired
``_gmaps.png`` reference images the cross-domain matcher runs; the
helper records the boolean outcome AND tags the subset's overall
result as ``PARTIAL`` unconditionally — because N=2 is too small to
yield a meaningful pass/fail statistic, and the traceability matrix
documents this AC as PARTIAL irrespective of count.
The scenario test pulls per-frame retrieval candidates from the FDR
``retrieval-topk`` record stream and per-image scene-change outcomes
from the cross-domain matcher's FDR record stream; this module only
decides whether the parsed inputs satisfy the AC.
Public-boundary discipline: NO imports from ``src/gps_denied_onboard``.
"""
from __future__ import annotations
import csv
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Sequence
from .geo import distance_m
# ─────────────────────── FDR record kinds & schema ───────────────────────
RETRIEVAL_TOPK_FDR_KIND = "retrieval-topk"
SCENE_CHANGE_MATCH_FDR_KIND = "scene-change-match"
TOP_K_REQUIRED = 10
TOP_K_DISTANCE_TOLERANCE_M = 100.0
# Scene-change-pair convention: image `AD<NNNNNN>.jpg` paired with
# `AD<NNNNNN>_gmaps.png`. Only 2 pairs exist in the project's static
# fixture set (`still-image-sat-refs-2`).
SCENE_CHANGE_PAIRED_IMAGE_IDS: tuple[str, ...] = ("AD000001", "AD000002")
SCENE_CHANGE_SUBSET_PARTIAL_LABEL = "PARTIAL"
@dataclass(frozen=True)
class CandidateTile:
"""One top-K candidate the SUT retrieved from its tile cache.
``centre_lat_deg`` / ``centre_lon_deg`` is the WGS84 centre of the
tile's footprint per ``TileMetadata.centre_wgs84``.
"""
tile_id: str
centre_lat_deg: float
centre_lon_deg: float
@dataclass(frozen=True)
class TopKQuery:
"""One image's top-K=10 retrieval result, plus the GT centre.
The scenario test produces one of these per image; the helper
decides AC-1 per-image and overall.
"""
image_id: str
true_centre_lat_deg: float
true_centre_lon_deg: float
candidates: tuple[CandidateTile, ...]
@dataclass(frozen=True)
class TopKImageReport:
"""Per-image AC-1 outcome.
``min_distance_m`` is the nearest candidate's distance (m) to the
true centre; ``None`` when ``candidates`` is empty (the helper
treats that as failure).
"""
image_id: str
candidate_count: int
min_distance_m: float | None
pass_distance: bool
@dataclass(frozen=True)
class TopKAggregateReport:
"""AC-1 of FT-P-19: every image's top-K covers the true centre."""
entries: tuple[TopKImageReport, ...]
max_distance_m: float
expected_image_count: int
@property
def pass_count(self) -> int:
return sum(1 for e in self.entries if e.pass_distance)
@property
def failing_entries(self) -> tuple[TopKImageReport, ...]:
return tuple(e for e in self.entries if not e.pass_distance)
@property
def passes(self) -> bool:
if len(self.entries) != self.expected_image_count:
return False
return self.pass_count == self.expected_image_count
def evaluate_top_k_within_distance(
queries: Sequence[TopKQuery],
*,
max_distance_m: float = TOP_K_DISTANCE_TOLERANCE_M,
expected_image_count: int = 60,
) -> TopKAggregateReport:
"""AC-1: every image's top-K must include a tile within ``max_distance_m`` m.
The helper computes the nearest candidate's Vincenty distance to
the true centre for each query and decides per-image. The image
passes iff at least one candidate is within ``max_distance_m``;
the aggregate passes iff every image passes AND the total query
count matches ``expected_image_count``.
Raises ``ValueError`` on ``max_distance_m <= 0``.
"""
if max_distance_m <= 0:
raise ValueError(f"max_distance_m must be > 0, got {max_distance_m}")
entries: list[TopKImageReport] = []
for q in queries:
if not q.candidates:
entries.append(
TopKImageReport(
image_id=q.image_id,
candidate_count=0,
min_distance_m=None,
pass_distance=False,
)
)
continue
distances = [
distance_m(
q.true_centre_lat_deg,
q.true_centre_lon_deg,
c.centre_lat_deg,
c.centre_lon_deg,
)
for c in q.candidates
]
min_d = min(distances)
entries.append(
TopKImageReport(
image_id=q.image_id,
candidate_count=len(q.candidates),
min_distance_m=min_d,
pass_distance=min_d <= max_distance_m,
)
)
return TopKAggregateReport(
entries=tuple(entries),
max_distance_m=max_distance_m,
expected_image_count=expected_image_count,
)
# ─────────────────────── AC-2 scene-change subset ───────────────────────
@dataclass(frozen=True)
class SceneChangeMatch:
"""One paired-image cross-domain matcher outcome."""
image_id: str # e.g. "AD000001"; pairs implicitly with `<image_id>_gmaps.png`
matched: bool
inlier_count: int | None # informational; ``None`` when the matcher didn't report it
@dataclass(frozen=True)
class SceneChangeSubsetReport:
"""AC-2 of FT-P-19: scene-change subset is structurally PARTIAL.
The subset's overall_label is ALWAYS ``PARTIAL`` — even when both
images match successfully — because N=2 is too small for a
meaningful pass/fail statistic and the traceability matrix
documents AC-8.6 as PARTIAL irrespective of outcome count.
"""
entries: tuple[SceneChangeMatch, ...]
expected_image_ids: tuple[str, ...] = SCENE_CHANGE_PAIRED_IMAGE_IDS
overall_label: str = SCENE_CHANGE_SUBSET_PARTIAL_LABEL
@property
def matched_count(self) -> int:
return sum(1 for e in self.entries if e.matched)
@property
def coverage_complete(self) -> bool:
"""True iff every expected paired-image id has an entry.
Coverage is a structural completeness check (did we collect
results for both AD000001 and AD000002?); it is independent
of the matcher pass/fail outcome.
"""
observed = {e.image_id for e in self.entries}
return observed == set(self.expected_image_ids)
def evaluate_scene_change_subset(
matches: Sequence[SceneChangeMatch],
*,
expected_image_ids: Sequence[str] = SCENE_CHANGE_PAIRED_IMAGE_IDS,
) -> SceneChangeSubsetReport:
"""AC-2: record the paired-image matcher outcomes and emit PARTIAL.
The result is intentionally lenient on pass/fail count — the PARTIAL
annotation comes from the spec, not from the data.
"""
return SceneChangeSubsetReport(
entries=tuple(matches),
expected_image_ids=tuple(expected_image_ids),
)
# ─────────────────────── CSV evidence emission ───────────────────────
TOP_K_CSV_HEADER: tuple[str, ...] = (
"image_id",
"candidate_count",
"min_distance_m",
"pass_distance",
)
SCENE_CHANGE_CSV_HEADER: tuple[str, ...] = (
"image_id",
"matched",
"inlier_count",
"subset_label",
)
def write_top_k_csv(path: Path, report: TopKAggregateReport) -> None:
"""Write per-image AC-1 results to ``path`` (CSV, UTF-8, LF newlines).
Idempotent: overwrites ``path`` if it exists.
Raises ``OSError`` if the parent directory does not exist.
"""
with path.open("w", encoding="utf-8", newline="") as fh:
writer = csv.writer(fh, lineterminator="\n")
writer.writerow(TOP_K_CSV_HEADER)
for e in report.entries:
writer.writerow(
[
e.image_id,
e.candidate_count,
"" if e.min_distance_m is None else f"{e.min_distance_m:.4f}",
"true" if e.pass_distance else "false",
]
)
def write_scene_change_csv(path: Path, report: SceneChangeSubsetReport) -> None:
"""Write paired-image AC-2 results to ``path`` (CSV) with PARTIAL tag.
Every row carries the subset's ``overall_label`` so a downstream
consumer can group / filter by it without joining tables.
"""
with path.open("w", encoding="utf-8", newline="") as fh:
writer = csv.writer(fh, lineterminator="\n")
writer.writerow(SCENE_CHANGE_CSV_HEADER)
for e in report.entries:
writer.writerow(
[
e.image_id,
"true" if e.matched else "false",
"" if e.inlier_count is None else e.inlier_count,
report.overall_label,
]
)
# ─────────────────────── FDR record projection ───────────────────────
def project_topk_record_to_query(
payload: object, true_centre_lat_deg: float, true_centre_lon_deg: float
) -> TopKQuery | None:
"""Project one FDR ``retrieval-topk`` payload onto a ``TopKQuery``.
The payload is expected to carry:
* ``image_id`` (str)
* ``candidates`` — list[dict] of {tile_id, centre_lat_deg, centre_lon_deg}
Returns ``None`` when the payload is malformed (missing fields, wrong
shape) — the scenario logs the skip and treats it as failure at
aggregate time.
"""
if not isinstance(payload, dict):
return None
image_id = payload.get("image_id")
raw_candidates = payload.get("candidates")
if not isinstance(image_id, str) or not isinstance(raw_candidates, (list, tuple)):
return None
candidates: list[CandidateTile] = []
for c in raw_candidates:
if not isinstance(c, dict):
continue
tile_id = c.get("tile_id")
lat = c.get("centre_lat_deg")
lon = c.get("centre_lon_deg")
if (
not isinstance(tile_id, str)
or not isinstance(lat, (int, float))
or not isinstance(lon, (int, float))
):
continue
candidates.append(
CandidateTile(
tile_id=tile_id,
centre_lat_deg=float(lat),
centre_lon_deg=float(lon),
)
)
return TopKQuery(
image_id=image_id,
true_centre_lat_deg=true_centre_lat_deg,
true_centre_lon_deg=true_centre_lon_deg,
candidates=tuple(candidates),
)
def project_scene_change_record(payload: object) -> SceneChangeMatch | None:
"""Project one FDR ``scene-change-match`` payload onto a ``SceneChangeMatch``.
Expected payload shape:
* ``image_id`` (str)
* ``matched`` (bool)
* ``inlier_count`` (int | None) — optional
Returns ``None`` on malformed input.
"""
if not isinstance(payload, dict):
return None
image_id = payload.get("image_id")
matched = payload.get("matched")
if not isinstance(image_id, str) or not isinstance(matched, bool):
return None
inlier_raw = payload.get("inlier_count")
inlier_count: int | None
if inlier_raw is None:
inlier_count = None
elif isinstance(inlier_raw, bool) or not isinstance(inlier_raw, int):
inlier_count = None
else:
inlier_count = inlier_raw
return SceneChangeMatch(
image_id=image_id, matched=matched, inlier_count=inlier_count
)
def iter_topk_payloads(records: Iterable[object]) -> Iterable[object]:
"""Filter an iterable of FDR records, yielding ``retrieval-topk`` payloads."""
for rec in records:
rt = getattr(rec, "record_type", None)
if rt == RETRIEVAL_TOPK_FDR_KIND:
yield getattr(rec, "payload", None)
def iter_scene_change_payloads(records: Iterable[object]) -> Iterable[object]:
"""Filter an iterable of FDR records, yielding ``scene-change-match`` payloads."""
for rec in records:
rt = getattr(rec, "record_type", None)
if rt == SCENE_CHANGE_MATCH_FDR_KIND:
yield getattr(rec, "payload", None)
@@ -0,0 +1,168 @@
"""FT-N-05 — Stale-tile rejection on freshness violation (AZ-427 / AC-8.2, AC-NEW-6).
Two sub-cases:
* ``synth-age-7mo`` mounted, SUT configured for active-conflict sector.
* ``synth-age-13mo`` mounted, SUT configured for rear sector.
For each sub-case: push the 60 still images, then assert one of:
1. **Signal A** — the FDR contains at least one ``tile-load-rejected``
record with ``reason == "stale"`` AND no outbound emission carried
``source_label = satellite_anchored``.
2. **Signal B** — the SUT loaded the cache but every emission falls in
``{visual_propagated, dead_reckoned}``.
Either signal is acceptable; the AC fails the moment any frame slips
through with ``satellite_anchored`` from these aged tiles.
What this file owns:
* The AC-1 / AC-2 / AC-3 wiring for both sub-cases.
* Sector-binding declaration via ``AGED_FIXTURE_SECTOR_BINDINGS``.
What this file does NOT own:
* The frame-source push → ``runner.helpers.frame_source_replay`` (stub).
* The SITL message receipt → ``runner.helpers.sitl_observer`` (stub).
* The fixture-builder support for mounting ``synth-age-*`` tiles and
switching sector configuration — surfaced as a production-dependency
finding in the batch report (gated on ``sitl_replay_ready`` +
``E2E_FT_N_05_FIXTURE`` env var; the test skips cleanly when not set
and fails loudly when set but the records are missing).
"""
from __future__ import annotations
import os
from pathlib import Path
import pytest
from runner.helpers import accuracy_evaluator as ae
from runner.helpers import aged_tile_rejection_evaluator as ate
FT_N_05_FIXTURE_ENV = "E2E_FT_N_05_FIXTURE"
GT_CSV = Path(__file__).resolve().parents[3] / "_docs" / "00_problem" / "input_data" / "coordinates.csv"
@pytest.mark.traces_to("AC-8.2,AC-NEW-6,AC-1,AC-2,AC-3")
@pytest.mark.parametrize(
"fixture, sector",
ate.AGED_FIXTURE_SECTOR_BINDINGS,
ids=[f"{f}_{s}" for f, s in ate.AGED_FIXTURE_SECTOR_BINDINGS],
)
def test_ft_n_05_stale_tile_rejection(
fixture: str,
sector: str,
fc_adapter: str,
vio_strategy: str,
evidence_dir, # type: ignore[no-untyped-def]
run_id: str,
nfr_recorder, # type: ignore[no-untyped-def]
sitl_replay_ready: bool,
) -> None:
"""Full FT-N-05 sub-case (AC-1 or AC-2 depending on ``fixture``)."""
if not sitl_replay_ready:
pytest.skip(
"FT-N-05 requires `E2E_SITL_REPLAY_DIR` to point at a SITL replay "
"fixture mounting one of the synth-age tile sets (AZ-595 + AZ-427 "
"fixture builder). Pure-logic AC-8.2/AC-NEW-6 coverage lives in "
"e2e/_unit_tests/helpers/test_aged_tile_rejection_evaluator.py."
)
active_fixture = os.environ.get(FT_N_05_FIXTURE_ENV)
if active_fixture is None:
pytest.skip(
f"FT-N-05 needs `{FT_N_05_FIXTURE_ENV}` env var to declare which "
f"synth-age fixture the fixture builder mounted "
f"(one of {[f for f, _ in ate.AGED_FIXTURE_SECTOR_BINDINGS]}). "
"The fixture builder publishes this from its per-sub-case orchestration."
)
if active_fixture != fixture:
pytest.skip(
f"FT-N-05 sub-case [{fixture}/{sector}] does not match the mounted "
f"fixture (`{FT_N_05_FIXTURE_ENV}` = {active_fixture!r}). "
"Subsequent matrix sub-cases will run in their own pytest invocations."
)
from runner.helpers import fdr_reader, frame_source_replay, sitl_observer
from runner.helpers.frame_source_replay import FrameSourceReplayer
image_paths = sorted(GT_CSV.parent.glob("AD??????.jpg"))
if len(image_paths) != ae.TOTAL_IMAGES_REQUIRED:
pytest.fail(
f"FT-N-05 expects {ae.TOTAL_IMAGES_REQUIRED} still images, "
f"found {len(image_paths)} under {GT_CSV.parent}"
)
sitl_host = "sitl-ardupilot" if fc_adapter == "ardupilot" else "sitl-inav"
observer = sitl_observer.get_observer(fc_kind=fc_adapter, host=sitl_host)
sink = _resolve_frame_sink()
replayer = FrameSourceReplayer(sink)
emissions: list[ate.SourceLabelEmission] = []
per_image_timeout_s = 5.0
for path in image_paths:
image_id = path.name
replayer.replay_image(path)
try:
msg = observer.wait_for_outbound(timeout_s=per_image_timeout_s)
except TimeoutError:
continue
label = getattr(msg, "source_label", None)
if not isinstance(label, str):
pytest.fail(
f"FT-N-05: outbound emission for {image_id} carries no `source_label` "
f"(got {type(label).__name__}). The SUT must include `source_label` per "
"FT-P-03 / AC-1.4 even on downgraded frames."
)
emissions.append(ate.SourceLabelEmission(frame_id=image_id, label=label))
fdr_root = Path(evidence_dir).parent / f"run-{run_id}" / "fdr"
rejections = list(ate.iter_stale_rejection_payloads(fdr_reader.iter_records(fdr_root)))
sub_case_id = f"{fixture}_{sector}"
report = ate.evaluate_aged_tile_rejection(
sub_case_id=sub_case_id,
fixture=fixture,
sector=sector,
emissions=emissions,
rejections=rejections,
)
metric_prefix = f"ft_n_05.{sub_case_id}"
nfr_recorder.record_metric(
f"{metric_prefix}.emissions_observed", float(report.emissions_observed), ac_id="AC-1"
)
nfr_recorder.record_metric(
f"{metric_prefix}.anchored_count", float(report.anchored_count), ac_id="AC-1"
)
nfr_recorder.record_metric(
f"{metric_prefix}.downgrade_count", float(report.downgrade_frame_count), ac_id="AC-1"
)
nfr_recorder.record_metric(
f"{metric_prefix}.stale_rejection_count",
float(len(report.stale_rejections)),
ac_id="AC-1",
)
assert not report.illegal_labels, (
"FT-N-05: outbound emissions include labels outside the FT-P-03 contract: "
f"{report.illegal_labels}. Fix the SUT before assessing the freshness gate."
)
assert report.passes, (
f"FT-N-05 [{sub_case_id}] failed: signal_a={report.signal_a_holds}, "
f"signal_b={report.signal_b_holds}; "
f"anchored_frames={list(report.anchored_frame_ids)[:5]}, "
f"emissions={report.emissions_observed}, "
f"downgrade_count={report.downgrade_frame_count}, "
f"stale_rejections={list(report.stale_rejections)[:5]}"
)
def _resolve_frame_sink(): # type: ignore[no-untyped-def]
"""Return a replay-mode `FrameSink` (counter-only; AZ-597)."""
from runner.helpers.replay_mode import NullFrameSink
return NullFrameSink()
@@ -0,0 +1,149 @@
"""FT-P-19 — Satellite-relocalization scale-ratio + scene-change PARTIAL (AZ-423 / AC-8.6).
The full scenario:
1. Push each ``AD0000NN.jpg`` from ``still-image-set-60`` to the SUT's
frame source, one at a time. For each image, the SUT runs top-K=10
tile-cache retrieval and writes an FDR ``retrieval-topk`` record
carrying the 10 candidate tile centres.
2. The test joins the per-image GT (``coordinates.csv``) with the FDR
stream and asserts that EVERY image's top-K covers a tile centre
within 100 m of the image's true centre (``AC-1`` / set_contains).
3. For the 2 paired ``_gmaps.png`` reference images the SUT runs the
cross-domain matcher; the test records per-image match success
into a CSV and tags the subset as ``PARTIAL`` (``AC-2``).
4. ``AC-3`` — parameterisation across ``(fc_adapter, vio_strategy)``.
What this file owns:
* The AC-1 / AC-2 / AC-3 wiring above.
* CSV evidence emission via the AZ-423-owned ``retrieval_evaluator``.
What this file does NOT own:
* The frame-source push → ``runner.helpers.frame_source_replay`` (stub,
AZ-441) — skip-gated.
* The SITL message receipt → ``runner.helpers.sitl_observer`` (stub;
AZ-416/AZ-417) — skip-gated.
* The SUT-side ``retrieval-topk`` + ``scene-change-match`` FDR record
emission — production gap, surfaced via fail-loud when the fixture
exists but the records are missing.
When ``E2E_SITL_REPLAY_DIR`` is set and points at a prepared SITL
replay fixture, this file's runtime path activates automatically.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from runner.helpers import accuracy_evaluator as ae
from runner.helpers import retrieval_evaluator as re_
GT_CSV = Path(__file__).resolve().parents[3] / "_docs" / "00_problem" / "input_data" / "coordinates.csv"
STILL_IMAGES_DIR = GT_CSV.parent
@pytest.mark.traces_to("AC-8.6,AC-1,AC-2,AC-3")
def test_ft_p_19_sat_reloc_scale(
fc_adapter: str,
vio_strategy: str,
evidence_dir, # type: ignore[no-untyped-def]
run_id: str,
nfr_recorder, # type: ignore[no-untyped-def]
sitl_replay_ready: bool,
) -> None:
"""Full FT-P-19 scenario (AC-8.6).
AC-1: every image's top-K=10 includes a tile centre within 100 m of GT.
AC-2: paired _gmaps.png subset → PARTIAL.
AC-3: parametrised across ``(fc_adapter, vio_strategy)``.
"""
if not sitl_replay_ready:
pytest.skip(
"FT-P-19 requires `E2E_SITL_REPLAY_DIR` to point at a SITL replay "
"fixture emitting `retrieval-topk` + `scene-change-match` FDR "
"records (AZ-595 + AZ-423 fixture builder). Pure-logic AC-8.6 "
"coverage lives in e2e/_unit_tests/helpers/test_retrieval_evaluator.py."
)
from runner.helpers import fdr_reader
gt_rows = ae.load_gt_coordinates(GT_CSV)
gt_by_id = {_normalise_image_id(r.image_id): r for r in gt_rows}
fdr_root = Path(evidence_dir).parent / f"run-{run_id}" / "fdr"
topk_queries: list[re_.TopKQuery] = []
for payload in re_.iter_topk_payloads(fdr_reader.iter_records(fdr_root)):
if not isinstance(payload, dict):
continue
image_id_raw = payload.get("image_id")
if not isinstance(image_id_raw, str):
continue
image_key = _normalise_image_id(image_id_raw)
gt = gt_by_id.get(image_key)
if gt is None:
continue
q = re_.project_topk_record_to_query(
payload, true_centre_lat_deg=gt.lat_deg, true_centre_lon_deg=gt.lon_deg
)
if q is not None:
topk_queries.append(q)
scene_change_matches: list[re_.SceneChangeMatch] = []
for payload in re_.iter_scene_change_payloads(fdr_reader.iter_records(fdr_root)):
m = re_.project_scene_change_record(payload)
if m is not None:
scene_change_matches.append(m)
if not topk_queries:
pytest.fail(
f"FT-P-19: no `{re_.RETRIEVAL_TOPK_FDR_KIND}` FDR records found at "
f"{fdr_root}. SUT must emit a top-K retrieval record per pushed "
"image; fixture builder must collect them."
)
topk_report = re_.evaluate_top_k_within_distance(
topk_queries, expected_image_count=ae.TOTAL_IMAGES_REQUIRED
)
scene_change_report = re_.evaluate_scene_change_subset(scene_change_matches)
top_k_csv = Path(evidence_dir) / f"ft-p-19-topk-{fc_adapter}-{vio_strategy}.csv"
scene_change_csv = Path(evidence_dir) / f"ft-p-19-scene-change-{fc_adapter}-{vio_strategy}.csv"
re_.write_top_k_csv(top_k_csv, topk_report)
re_.write_scene_change_csv(scene_change_csv, scene_change_report)
nfr_recorder.record_metric(
"ft_p_19.topk_pass_count", float(topk_report.pass_count), ac_id="AC-1"
)
nfr_recorder.record_metric(
"ft_p_19.topk_image_count", float(len(topk_report.entries)), ac_id="AC-1"
)
nfr_recorder.record_metric(
"ft_p_19.scene_change_matched_count",
float(scene_change_report.matched_count),
ac_id="AC-2",
)
assert topk_report.passes, (
f"AC-1 (top-K=10 within {topk_report.max_distance_m} m for {topk_report.expected_image_count} images) failed: "
f"pass_count={topk_report.pass_count}/{topk_report.expected_image_count}, "
f"failing={[(e.image_id, e.min_distance_m) for e in topk_report.failing_entries[:5]]}"
)
assert scene_change_report.coverage_complete, (
"AC-2 (scene-change subset coverage) failed: paired images observed = "
f"{[e.image_id for e in scene_change_report.entries]}, expected = "
f"{list(scene_change_report.expected_image_ids)}"
)
assert scene_change_report.overall_label == re_.SCENE_CHANGE_SUBSET_PARTIAL_LABEL, (
f"AC-2 (PARTIAL annotation): got {scene_change_report.overall_label!r}, "
f"expected {re_.SCENE_CHANGE_SUBSET_PARTIAL_LABEL!r}"
)
def _normalise_image_id(image_id: str) -> str:
"""Strip ``.jpg`` extension if present; ``coordinates.csv`` uses the bare stem."""
return image_id[:-4] if image_id.lower().endswith(".jpg") else image_id