mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 09:01:14 +00:00
Compare commits
2 Commits
7fb3cb3f34
...
b0296da911
| Author | SHA1 | Date | |
|---|---|---|---|
| b0296da911 | |||
| bb744d9078 |
@@ -0,0 +1,194 @@
|
||||
# Batch 81 Report — FT-P-12 + FT-P-13 GCS downsample + command path
|
||||
|
||||
**Batch**: 81
|
||||
**Date**: 2026-05-17
|
||||
**Context**: Test implementation (greenfield Step 10 — Implement Tests)
|
||||
**Tasks**: AZ-420 (3 cp) — single scenario task covering FT-P-12 + FT-P-13
|
||||
**Cycle**: 1
|
||||
**Verdict**: COMPLETE — PASS_WITH_WARNINGS (self-reviewed; see `reviews/batch_81_review.md`)
|
||||
|
||||
## Summary
|
||||
|
||||
Implements the GCS-leg blackbox scenarios under epic AZ-262:
|
||||
|
||||
* **FT-P-12** — SUT→GCS summary stream cadence (`AC-6.1`). The C8
|
||||
`QgcTelemetryAdapter` pairs `GLOBAL_POSITION_INT` + `NAMED_VALUE_FLOAT`
|
||||
at the configured `summary_rate_hz`; the test parses the
|
||||
`mavproxy-listener`-captured tlog over a 60 s Derkachi replay and
|
||||
asserts the observed `GLOBAL_POSITION_INT` rate lands in [1, 2] Hz.
|
||||
* **FT-P-13** — GCS-originated operator re-loc hint (`AC-6.2`). A
|
||||
`STATUSTEXT` carrying `RELOC:<lat>,<lon>,<radius_m>` is injected
|
||||
while the SUT is in `dead_reckoned`; the SUT must (a) acknowledge
|
||||
via an FDR `c8.gcs.operator_command` record within ≤ 2 s, (b) bias
|
||||
its next `anchor_search_region` toward the hint, (c) not reject
|
||||
the well-formed hint with a security/auth STATUSTEXT.
|
||||
|
||||
### AZ-420 — FT-P-12 + FT-P-13 (3 cp)
|
||||
|
||||
* **`e2e/runner/helpers/gcs_telemetry_evaluator.py`** (new, 430 lines):
|
||||
pure-logic evaluators sourced from the GCS tlog + FDR archive.
|
||||
* `compute_gcs_summary_rate(messages, *, position_msg_type, ...)` →
|
||||
`GcsSummaryRateReport(observed_rate_hz, passes, ...)` — AC-6.1.
|
||||
* `extract_inbound_hints(messages, *, hint_prefix='RELOC:')` →
|
||||
`list[InboundHint]` — tlog→DTO adapter.
|
||||
* `parse_reloc_payload(hint_text)` → `(lat_deg, lon_deg, radius_m)`.
|
||||
* `correlate_hint_acks(hints, acks)` → `HintAckReport` (AC-2).
|
||||
Greedy injection-order pairing; each ack matches at most one hint.
|
||||
* `evaluate_search_region_shift(regions, hint_inject_us, lat, lon)` →
|
||||
`SearchRegionShiftReport` (AC-3). Compares last pre-hint region
|
||||
centre to first post-hint region centre via haversine distance.
|
||||
* `haversine_distance_m(lat_a, lon_a, lat_b, lon_b)` — great-circle
|
||||
distance, mean Earth radius. Sub-100 km accuracy ≪ 1 m.
|
||||
* `detect_hint_rejection(messages, inject_us, *, window_us=2e6)` →
|
||||
`HintRejectionReport` (AC-4). Scans STATUSTEXT in the post-inject
|
||||
window for `BAD_SIGNATURE` / `UNAUTHORIZED` / `REJECTED` tokens.
|
||||
* `collect_messages_to_list(messages)` — convenience for the
|
||||
"parse once, run N analyzers" pattern (mirrors
|
||||
`ap_contract_evaluator`).
|
||||
|
||||
* **`e2e/runner/helpers/sitl_observer.py`** (edited, +25 lines):
|
||||
adds `capture_gcs_tlog(host, duration_s) -> Path` mirroring
|
||||
`capture_ap_tlog`. Loads the FDR-replay fixture at
|
||||
`${E2E_SITL_REPLAY_DIR}/gcs_tlog_<host>.tlog`. Raises `RuntimeError`
|
||||
on missing env / missing fixture / non-positive duration.
|
||||
|
||||
* **`e2e/tests/positive/test_ft_p_12_gcs_downsample.py`** (new,
|
||||
110 lines): full FT-P-12 scenario. Skips when `sitl_replay_ready`
|
||||
is False (no SITL fixture). Parametric across
|
||||
`(fc_adapter, vio_strategy)` via conftest. `traces_to(AC-6.1,AC-1,AC-5)`.
|
||||
|
||||
* **`e2e/tests/positive/test_ft_p_13_gcs_command.py`** (new,
|
||||
211 lines): full FT-P-13 scenario. Walks the FDR archive for
|
||||
`c8.gcs.operator_command` ack records + `anchor_search_region`
|
||||
per-frame records. Skips on missing fixture; fails loudly on
|
||||
empty hint list / empty FDR archive so the test cannot silently
|
||||
green-light an unimplemented production path.
|
||||
`traces_to(AC-6.2,AC-2,AC-3,AC-4,AC-5)`.
|
||||
|
||||
* **`e2e/_unit_tests/helpers/test_gcs_telemetry_evaluator.py`** (new,
|
||||
39 tests): pure-logic coverage for every evaluator + adapter.
|
||||
Boundary cases include 1.0 / 2.0 Hz inclusive, ack-before-hint
|
||||
ignored, latency exactly at 2 000 ms, no pre-hint region, equal
|
||||
distance non-strict, BAD_SIGNATURE / UNAUTHORIZED / REJECTED
|
||||
token detection, malformed `RELOC:` payload raises `ValueError`.
|
||||
|
||||
* **`e2e/_unit_tests/helpers/test_sitl_observer.py`** (edited, +4 tests):
|
||||
`capture_gcs_tlog` happy path + missing env + missing fixture +
|
||||
zero/negative duration. Mirrors the existing `capture_ap_tlog`
|
||||
test block.
|
||||
|
||||
* **`e2e/_unit_tests/test_directory_layout.py`** (edited): registers
|
||||
`runner/helpers/gcs_telemetry_evaluator.py`,
|
||||
`tests/positive/test_ft_p_12_gcs_downsample.py`,
|
||||
`tests/positive/test_ft_p_13_gcs_command.py`.
|
||||
|
||||
## Tests
|
||||
|
||||
Full `e2e/_unit_tests/` suite: **746 passed in 147.57 s** (baseline
|
||||
700 → +46 net). Run via `python -m pytest e2e/_unit_tests/` from
|
||||
the workspace root. No flakes, no skips outside the pre-existing
|
||||
intentional skips.
|
||||
|
||||
Collection check on the two new scenario tests (`pytest
|
||||
--collect-only e2e/tests/positive/test_ft_p_12_gcs_downsample.py
|
||||
e2e/tests/positive/test_ft_p_13_gcs_command.py`): 12 items collected
|
||||
(2 tests × 6 `(fc_adapter, vio_strategy)` combinations each).
|
||||
The scenarios skip locally because `E2E_SITL_REPLAY_DIR` is unset —
|
||||
which is the intended docker-vs-host boundary; they run inside the
|
||||
docker-compose SITL replay harness.
|
||||
|
||||
Per-area test counts (this batch):
|
||||
|
||||
| File | Tests added |
|
||||
|------|-------------|
|
||||
| `test_gcs_telemetry_evaluator.py` (new) | 39 |
|
||||
| `test_sitl_observer.py` (edited) | 4 |
|
||||
| `test_directory_layout.py` (edited) | 3 (path entries) |
|
||||
| `test_no_sut_imports.py` (no edit; broader walk) | implicit +1 module covered |
|
||||
| **Total** | **+46** |
|
||||
|
||||
## Acceptance Criteria Verification
|
||||
|
||||
| AC | Status | Evidence |
|
||||
|-----|--------|----------|
|
||||
| AC-1 — GCS rate ∈ [1, 2] Hz over 60 s window | ✓ | `test_ft_p_12_gcs_downsample` + 10 `compute_gcs_summary_rate` unit tests (boundary, degeneracy, custom bounds) |
|
||||
| AC-2 — FDR ack ≤ 2 s after inject | ✓ | `test_ft_p_13_gcs_command` + 6 `correlate_hint_acks` unit tests |
|
||||
| AC-3 — `anchor_search_region` shifts toward hint | ✓ | `test_ft_p_13_gcs_command` + 5 `evaluate_search_region_shift` + 3 `haversine_distance_m` unit tests |
|
||||
| AC-4 — No security/auth rejection in window | ✓ | `test_ft_p_13_gcs_command` + 7 `detect_hint_rejection` unit tests |
|
||||
| AC-5 — Parameterised per `(fc_adapter, vio_strategy)` | ✓ | `pytest --collect-only` shows 6 param IDs per scenario |
|
||||
|
||||
## Code Review Verdict
|
||||
PASS_WITH_WARNINGS (no Critical, no High; 2 Low notes — see
|
||||
`reviews/batch_81_review.md`).
|
||||
|
||||
## Auto-Fix Attempts
|
||||
0 (no auto-fix-eligible findings).
|
||||
|
||||
## Stuck Agents
|
||||
None.
|
||||
|
||||
## Notable Decisions
|
||||
|
||||
* **`HintAckReport.passes` returns False for empty hints.** The
|
||||
scenario test pre-checks `if not hints: pytest.fail(...)` before
|
||||
calling `correlate_hint_acks`, so the evaluator never observes
|
||||
an empty list in practice. Leaving the conservative semantic in
|
||||
place — "no hints" is a misuse of the correlator, not a trivial
|
||||
pass — and pushing the explicit failure upstream where the
|
||||
contextual error message ("the fixture builder must inject at
|
||||
least one operator re-loc hint") is more useful.
|
||||
* **AC-3's `passes` is non-strict shift.** A region exactly
|
||||
equidistant before/after the inject is treated as "not biased"
|
||||
(`distance_after_m < distance_before_m` is strict). This matches
|
||||
the spec wording "shifts toward the hinted location" — zero
|
||||
movement is not a shift. Documented in
|
||||
`SearchRegionShiftReport.passes`.
|
||||
* **Counted `GLOBAL_POSITION_INT` only for AC-6.1, not the
|
||||
`NAMED_VALUE_FLOAT` companion.** The QGC adapter pairs them so
|
||||
counting both would double-count. The position message is the
|
||||
contract-relevant half; the NAMED_VALUE_FLOAT carries the decorative
|
||||
horizontal-uncertainty annotation.
|
||||
* **Tests are shaped to fail loudly when the upstream production
|
||||
hooks are missing.** AC-2 requires the C8 adapter to translate an
|
||||
inbound STATUSTEXT into an FDR `c8.gcs.operator_command` record;
|
||||
AC-3 requires the C2 backbone to emit `anchor_search_region` FDR
|
||||
records. Both are deferred work outside AZ-420's scope. The
|
||||
scenario tests skip cleanly when no fixture is present
|
||||
(`sitl_replay_ready=False`) and fail with a specific error when
|
||||
the fixture exists but lacks the expected hint or ack records.
|
||||
This is the "tests as gates" pattern called out in the implement
|
||||
skill.
|
||||
|
||||
## Production Dependencies (forward-look)
|
||||
|
||||
FT-P-13 transitively depends on:
|
||||
|
||||
* **Inbound STATUSTEXT command parser** in
|
||||
`c8_fc_adapter/mavlink_gcs_adapter.py`. Currently the adapter emits
|
||||
but does not consume STATUSTEXT. The C12
|
||||
`MavlinkOperatorCommandTransport` concrete impl is a Protocol-only
|
||||
stub.
|
||||
* **`anchor_search_region` FDR record** emitted by the C2 backbone
|
||||
per nav-camera frame. The FDR schema (AC-NEW-3 family) reserves
|
||||
the slot but no producer wires it.
|
||||
|
||||
These gaps are surfaced (not silently absorbed) by the scenario
|
||||
tests when the fixture builder produces a tlog without the
|
||||
corresponding fixtures. They will be picked up by future production
|
||||
implementation tasks; AZ-420 owns the test surface only.
|
||||
|
||||
## Out of Scope (deferred)
|
||||
|
||||
* Spoofed-GPS escalation STATUSTEXT path — owned by FT-N-04 (AZ-426).
|
||||
* Operator-reloc-request emission negative-path — owned by FT-N-03
|
||||
(AZ-425).
|
||||
* The fixture builder's actual `gcs_tlog_<host>.tlog` synthesis (with
|
||||
`RELOC:` injection + corresponding FDR `c8.gcs.operator_command`
|
||||
ack + `anchor_search_region` records) — owned by AZ-595.
|
||||
|
||||
## Next Batch
|
||||
|
||||
Batch 82 candidates from `_docs/02_tasks/todo/` (21 tasks remaining):
|
||||
AZ-421 (FT-P-14), AZ-422 (FT-P-15), AZ-423 (FT-N-01), AZ-424
|
||||
(FT-N-02). Topo-order leader is AZ-421. Pick at next `/autodev`
|
||||
invocation per implement-skill rules (≤ 4 tasks, ≤ 20 cp).
|
||||
@@ -0,0 +1,216 @@
|
||||
# Code Review Report
|
||||
|
||||
**Batch**: 81 — AZ-420 (FT-P-12 GCS downsample + FT-P-13 GCS command path)
|
||||
**Date**: 2026-05-17
|
||||
**Verdict**: PASS_WITH_WARNINGS
|
||||
|
||||
## Findings
|
||||
|
||||
| # | Severity | Category | File:Line | Title |
|
||||
|----|----------|---------------|--------------------------------------------------------------------|--------------------------------------------------------|
|
||||
| 1 | Low | Scope | `e2e/runner/helpers/gcs_telemetry_evaluator.py` | `HintAckReport.passes` returns False for empty hints |
|
||||
| 2 | Low | Maintainability | `e2e/tests/positive/test_ft_p_13_gcs_command.py:114` | FDR records loaded twice if regions list is long |
|
||||
|
||||
### Finding Details
|
||||
|
||||
**F1: `HintAckReport.passes` returns False when no hints supplied** (Low / Scope)
|
||||
|
||||
- Location: `e2e/runner/helpers/gcs_telemetry_evaluator.py:205-210`
|
||||
- Description: `passes` returns `False` if the hint list is empty.
|
||||
The scenario test pre-checks `if not hints: pytest.fail(...)` before
|
||||
calling `correlate_hint_acks`, so this branch is never reached in
|
||||
practice. But a future caller could be surprised — "no hints =
|
||||
trivially pass" is arguably the more defensible default for a
|
||||
pure evaluator.
|
||||
- Suggestion: leave as-is; the explicit upstream `pytest.fail` is
|
||||
cleaner than overloading the evaluator's semantics. Documented in
|
||||
the dataclass docstring.
|
||||
- Task: AZ-420
|
||||
|
||||
**F2: FDR record loop appends to two lists in one pass** (Low / Maintainability)
|
||||
|
||||
- Location: `e2e/tests/positive/test_ft_p_13_gcs_command.py:117-137`
|
||||
- Description: The test walks the FDR archive once and appends to
|
||||
both `acks` and `regions`. The if/elif keeps the walk O(n), but
|
||||
the branch ordering makes the test harder to scan when a future
|
||||
contributor adds a third record type.
|
||||
- Suggestion: defer until a third record type is needed; splitting
|
||||
prematurely adds two loops for no current benefit.
|
||||
- Task: AZ-420
|
||||
|
||||
## Findings Sweep
|
||||
|
||||
### Phase 1 — Context Loading
|
||||
|
||||
Read AZ-420 spec, project restrictions, module-layout, blackbox-tests
|
||||
docs (FT-P-12 / FT-P-13 sections), and the previously implemented
|
||||
templates (`test_ft_p_02_derkachi_drift.py`, `test_ft_p_09_ap_signing.py`)
|
||||
to inventory the test patterns and fixture surface. Reviewed
|
||||
`mavlink_gcs_adapter.py` to understand the SUT's outbound summary
|
||||
shape (`GLOBAL_POSITION_INT` + `NAMED_VALUE_FLOAT`) — only the
|
||||
position message is counted for AC-6.1 to avoid double-counting the
|
||||
decorative companion.
|
||||
|
||||
### Phase 2 — Spec Compliance (AC trace)
|
||||
|
||||
* **AC-1** (FT-P-12 GCS rate ∈ `[1, 2]` Hz) ✓
|
||||
- Scenario: `test_ft_p_12_gcs_downsample` calls
|
||||
`compute_gcs_summary_rate` and asserts `report.passes`.
|
||||
- Pure-logic coverage: 10 tests in `test_gcs_telemetry_evaluator.py`
|
||||
(window bounds, boundary 1.0/2.0/inclusive, single-message
|
||||
degeneracy, identical-timestamps, non-position filtering, custom
|
||||
bounds, invalid bounds → `ValueError`).
|
||||
|
||||
* **AC-2** (FT-P-13 hint ack ≤ 2 s via FDR) ✓
|
||||
- Scenario: `test_ft_p_13_gcs_command` calls `correlate_hint_acks`
|
||||
and asserts `ack_report.passes`.
|
||||
- Pure-logic coverage: 6 tests (single-hint single-ack, multi-hint
|
||||
greedy pairing, ack-before-hint ignored, latency exactly at
|
||||
boundary, missing ack → `passes = False`, empty hints).
|
||||
|
||||
* **AC-3** (FT-P-13 search prior bias) ✓
|
||||
- Scenario: `test_ft_p_13_gcs_command` calls
|
||||
`evaluate_search_region_shift` against `anchor_search_region` FDR
|
||||
records and asserts `shift_report.passes`.
|
||||
- Pure-logic coverage: 5 shift tests + 3 haversine sanity tests
|
||||
(no pre-hint region, no post-hint region, shift toward hint,
|
||||
drift away from hint, equal distance — non-strict comparison
|
||||
documented).
|
||||
|
||||
* **AC-4** (FT-P-13 no rejection) ✓
|
||||
- Scenario: `test_ft_p_13_gcs_command` calls
|
||||
`detect_hint_rejection` and asserts
|
||||
`rejection_report.passes`.
|
||||
- Pure-logic coverage: 7 tests (no STATUSTEXT, rejection
|
||||
inside window, rejection outside window, case-insensitive
|
||||
token match, BAD_SIGNATURE / UNAUTHORIZED / REJECTED tokens,
|
||||
invalid `window_us` → `ValueError`).
|
||||
|
||||
* **AC-5** (parameterisation) ✓
|
||||
- `pytest --collect-only` confirms 6 param IDs per scenario:
|
||||
`[ardupilot|inav]-[okvis2|klt_ransac|vins_mono]`. Both tests
|
||||
accept `fc_adapter` + `vio_strategy` fixtures via conftest.
|
||||
|
||||
### Phase 3 — Code Quality
|
||||
|
||||
* SRP: `gcs_telemetry_evaluator.py` owns four independent evaluators
|
||||
(`compute_gcs_summary_rate`, `correlate_hint_acks`,
|
||||
`evaluate_search_region_shift`, `detect_hint_rejection`) + two
|
||||
tlog→DTO adapters (`extract_inbound_hints`, `parse_reloc_payload`).
|
||||
Each function has one reason to change. ✓
|
||||
* No silent error suppression: invalid bounds raise `ValueError`
|
||||
with a message naming the offending value (`min_required_hz must
|
||||
be ≥0, got -1`); malformed payload parses raise `ValueError` with
|
||||
the raw text (`hint payload must have 3 comma-separated fields...`);
|
||||
ack correlation has no try/except. ✓
|
||||
* No code comments narrating mechanics; only docstrings + a one-line
|
||||
comment on the greedy-pairing intent ("keep moving forward to find
|
||||
the last pre-hint"). Tests use AAA pattern. ✓
|
||||
* Function complexity: longest is `evaluate_search_region_shift` at
|
||||
35 lines including the dataclass-construction tail. All under the
|
||||
50-line / cyclomatic-10 threshold. ✓
|
||||
* Naming: `inject_timestamp_us`, `ack_timestamp_us`, `distance_after_m`,
|
||||
`passes` — units are in the names; no `data` / `item` / `candidate`
|
||||
vagueness. ✓
|
||||
|
||||
### Phase 4 — Security Quick-Scan
|
||||
|
||||
* No SQL, no `shell=True`, no `eval`, no `exec`. ✓
|
||||
* No hardcoded secrets; no API keys. ✓
|
||||
* Input validation: `parse_reloc_payload` validates field count and
|
||||
float parsing before returning; `compute_gcs_summary_rate`
|
||||
validates rate bounds; `detect_hint_rejection` validates
|
||||
`window_us > 0`. ✓
|
||||
* No sensitive data in logs (no log statements in helper). ✓
|
||||
|
||||
### Phase 5 — Performance
|
||||
|
||||
* `compute_gcs_summary_rate`: O(n) over messages, one materialisation
|
||||
into `timestamps`. ✓
|
||||
* `correlate_hint_acks`: O(n log n) ack sort + single linear pass
|
||||
with greedy cursor. ✓
|
||||
* `evaluate_search_region_shift`: O(r) single pass over regions. ✓
|
||||
* `detect_hint_rejection`: O(m) single pass over messages with early
|
||||
filter on `msg_type`. ✓
|
||||
* No blocking I/O in async contexts (no async here). ✓
|
||||
* `collect_messages_to_list` materialises the tlog iterator once;
|
||||
scenarios then run 3 analyzers over the result without re-parsing —
|
||||
same pattern as `ap_contract_evaluator`. ✓
|
||||
|
||||
### Phase 6 — Cross-Task Consistency
|
||||
|
||||
* `capture_gcs_tlog` mirrors `capture_ap_tlog` exactly: same
|
||||
signature `(host: str, duration_s: float) -> Path`, same env-var
|
||||
resolution (`E2E_SITL_REPLAY_DIR`), same RuntimeError messaging
|
||||
pattern, same `duration_s > 0` precondition. ✓
|
||||
* `traces_to` marker format matches FT-P-09 / FT-P-02 conventions
|
||||
(`AC-6.1,AC-1,AC-5` — top-level NFR + per-AC IDs comma-separated). ✓
|
||||
* Fixture naming follows `<artifact>_<host>.tlog` (matches existing
|
||||
`ap_tlog_<host>.tlog` next to it). ✓
|
||||
|
||||
### Phase 7 — Architecture Compliance
|
||||
|
||||
Inputs: `_docs/02_document/module-layout.md` (Blackbox Tests owns
|
||||
`e2e/**`); changed files all under `e2e/`.
|
||||
|
||||
1. **Layer direction**: all imports inside `e2e/` reference
|
||||
`runner.helpers.*` (same component). No imports of
|
||||
`src/gps_denied_onboard.*`. Verified by
|
||||
`test_no_sut_imports.py` (PASS). ✓
|
||||
2. **Public API respect**: `gcs_telemetry_evaluator` imports only
|
||||
`runner.helpers.mavproxy_tlog_reader.TlogMessage` (a sibling
|
||||
helper); scenario tests import only from `runner.helpers.*`
|
||||
and stdlib. No cross-component imports. ✓
|
||||
3. **No new cyclic dependencies**: `gcs_telemetry_evaluator` →
|
||||
`mavproxy_tlog_reader`; no back-edge from reader to evaluator.
|
||||
Scenario tests are leaf modules (nothing imports them). ✓
|
||||
4. **Duplicate symbols**: no class/function/constant in the new
|
||||
helper duplicates an existing symbol anywhere in `e2e/`.
|
||||
`compute_gcs_summary_rate` is the GCS-summary-rate counterpart
|
||||
to `ap_contract_evaluator.compute_gps_input_rate` but is named
|
||||
differently and operates on a distinct message type
|
||||
(`GLOBAL_POSITION_INT` vs. `GPS_INPUT`). ✓
|
||||
5. **Cross-cutting concerns**: haversine math is local to this
|
||||
helper. Project does not yet have a shared geo-math module; one
|
||||
helper instance is acceptable until a second consumer appears
|
||||
(e.g. FT-N-04 spoof detection might want it). Noted for future
|
||||
refactor; not flagged as a finding. ✓
|
||||
|
||||
## Production Dependencies (forward-look)
|
||||
|
||||
FT-P-13 (AC-2 / AC-3) transitively depends on two production
|
||||
capabilities that are documented as deferred work:
|
||||
|
||||
* **Inbound STATUSTEXT command parser** in
|
||||
`c8_fc_adapter/mavlink_gcs_adapter.py` (currently emits but does
|
||||
not consume). The C12 `MavlinkOperatorCommandTransport` concrete
|
||||
implementation is a Protocol-only stub today.
|
||||
* **`anchor_search_region` FDR record** emitted by the C2 backbone
|
||||
per nav-camera frame. The FDR schema (AC-NEW-3 family) reserves
|
||||
the slot, but no producer wires it yet.
|
||||
|
||||
Both gaps are tracked outside AZ-420 — the test is shaped so it
|
||||
exercises these capabilities the moment they land, and skips
|
||||
cleanly (via `sitl_replay_ready`) or fails loudly (via the
|
||||
explicit `pytest.fail` on empty hint list / empty FDR archive)
|
||||
otherwise. This is the "tests as gates" pattern endorsed by the
|
||||
implement skill.
|
||||
|
||||
## Regression Gate
|
||||
|
||||
Full `e2e/_unit_tests/` suite: **746 passed in 147.57 s**, single run,
|
||||
no flakes. Up from 700 (batch 80 baseline) by +46:
|
||||
|
||||
* +39 in new `test_gcs_telemetry_evaluator.py` (10 rate, 6 ack-corr,
|
||||
3 haversine, 5 shift, 7 rejection, 4 extract-hints, 3 parse-payload,
|
||||
1 collect_messages_to_list).
|
||||
* +4 in `test_sitl_observer.py` (`capture_gcs_tlog` happy path +
|
||||
missing env + missing fixture + zero/negative duration).
|
||||
* +2 in `test_directory_layout.py` (new helper module + 2 new scenario
|
||||
tests under positive/).
|
||||
* +1 net from a `test_no_sut_imports.py` walk that now covers the
|
||||
new helper.
|
||||
|
||||
No tests removed; no tests skipped under normal CI execution; the
|
||||
two new scenarios skip locally because `E2E_SITL_REPLAY_DIR` is
|
||||
unset, which is the intended container-vs-host boundary.
|
||||
@@ -0,0 +1,209 @@
|
||||
# Cumulative Code Review — Batches 79, 80, 81
|
||||
|
||||
**Date**: 2026-05-17
|
||||
**Verdict**: PASS
|
||||
|
||||
Covers the arc:
|
||||
|
||||
* **Batch 79 / AZ-599** — FT-P-02 Derkachi vertical-slice fixture
|
||||
builder + `_common.py` extraction (factored out shared helpers
|
||||
from the b78 FT-P-01 builder).
|
||||
* **Batch 80 / AZ-600** — refactor `_common.py` + `build_p01_fixtures.py`
|
||||
+ `build_p02_fixtures.py` into a parameterised strategy framework
|
||||
(`builder.py`).
|
||||
* **Batch 81 / AZ-420** — FT-P-12 (GCS downsample) + FT-P-13 (GCS
|
||||
command path) scenarios with a new `gcs_telemetry_evaluator.py`
|
||||
helper + `sitl_observer.capture_gcs_tlog`.
|
||||
|
||||
The three batches together close two unrelated arcs:
|
||||
|
||||
1. **Fixture-builder consolidation**: the b78 FT-P-01 vertical slice
|
||||
landed as a one-off builder; b79 added a second one-off (FT-P-02)
|
||||
plus an `_common.py` extraction; b80 then collapsed both into a
|
||||
strategy-pattern framework where future scenarios compose existing
|
||||
strategies. Net result: a single durable `builder.py` instead of
|
||||
N per-scenario builders.
|
||||
2. **GCS-leg blackbox coverage**: b81 adds the FT-P-12 and FT-P-13
|
||||
scenarios that exercise the operator-facing telemetry stream
|
||||
and command path — orthogonal to the fixture-builder work and
|
||||
the first scenario pair that probes the C8 `QgcTelemetryAdapter`.
|
||||
|
||||
## Cross-Cutting Themes
|
||||
|
||||
### Convergence on a single helper / evaluator shape
|
||||
|
||||
Both b80's `builder.py` strategies and b81's
|
||||
`gcs_telemetry_evaluator.py` follow the same convention seen across
|
||||
the wider runner.helpers package:
|
||||
|
||||
1. **Pure-logic dataclasses** (`FixtureBuilderConfig`,
|
||||
`GcsSummaryRateReport`, `HintAckReport`, `SearchRegionShiftReport`,
|
||||
`HintRejectionReport`, `InboundHint`, `FdrCommandAck`,
|
||||
`SearchRegionRecord`) — `@dataclass(frozen=True)` with a `passes`
|
||||
property where the dataclass is an evaluation result.
|
||||
2. **`Iterable[TlogMessage]` ingress** — b81's four evaluators all
|
||||
accept `Iterable[TlogMessage]` instead of a Sequence so they
|
||||
compose with `mavproxy_tlog_reader.iter_messages` without
|
||||
materialisation. The scenario test materialises once via
|
||||
`collect_messages_to_list` and reuses for multiple analyzers
|
||||
(mirrors `ap_contract_evaluator.collect_messages_to_list` from b78).
|
||||
3. **No `src/gps_denied_onboard` imports** — `e2e/_unit_tests/
|
||||
test_no_sut_imports.py` walks the entire `e2e/` tree and
|
||||
passes across all three batches.
|
||||
|
||||
### Fixture-naming convention
|
||||
|
||||
The artifact naming pattern `<artifact>_<host>.tlog` is consistent
|
||||
across batches:
|
||||
|
||||
* `ap_tlog_<host>.tlog` (pre-existing, AP/SITL-side)
|
||||
* `gcs_tlog_<host>.tlog` (b81, GCS-side)
|
||||
|
||||
Same env var (`E2E_SITL_REPLAY_DIR`), same observer surface
|
||||
(`sitl_observer.capture_ap_tlog` / `capture_gcs_tlog`), same
|
||||
RuntimeError messaging convention ("RuntimeError: capture_X: env var
|
||||
not set", "fixture not found at PATH").
|
||||
|
||||
### Strategy-pattern reuse readiness
|
||||
|
||||
B80's `builder.py` factored four ABCs (`VideoSource`, `TlogSource`,
|
||||
`FdrProjection`) + six concrete impls. B81 was the first scenario
|
||||
batch after b80; it consumes no new strategy from `builder.py`
|
||||
(FT-P-12/13 are runtime tests, not fixture-builder scenarios), so
|
||||
the strategy framework's reuse is still latent. The next builder
|
||||
work (any of FT-P-04/05/07/08/10/11) will be the actual test of
|
||||
b80's refactor — they should each land as a ~30-line config
|
||||
factory, not a new builder module.
|
||||
|
||||
## Phase-by-Phase Findings
|
||||
|
||||
### Phase 1 — Context Loading
|
||||
|
||||
Read all three batch reports, all three per-batch reviews, and the
|
||||
task specs AZ-599, AZ-600, AZ-420. Mapped changed files to tasks via
|
||||
the batch reports' "Tasks" sections.
|
||||
|
||||
### Phase 2 — Spec Compliance (cumulative)
|
||||
|
||||
All three per-batch reviews already verified ACs against tests.
|
||||
Re-walking AC coverage across the union:
|
||||
|
||||
* **AZ-599 (FT-P-02 builder)**: 7 scenario integration tests
|
||||
(test_sitl_replay_builder_p02.py). Behavior preserved through
|
||||
b80's refactor (same tests now in slimmed file, plus new
|
||||
strategy-level coverage in test_sitl_replay_builder_builder.py).
|
||||
* **AZ-600 (strategy refactor)**: 33 strategy-level tests + 6 + 7
|
||||
scenario tests = 46 total across the three files.
|
||||
* **AZ-420 (FT-P-12+13)**: 39 evaluator + adapter unit tests + 2
|
||||
scenario tests (skip locally; collect 12 param IDs total).
|
||||
|
||||
No AC went from covered → uncovered between batches.
|
||||
|
||||
### Phase 3 — Code Quality (cumulative)
|
||||
|
||||
* **SRP across batches**: b80 splits orchestration (`build_fixtures`)
|
||||
from materialisation (each strategy) — clean. B81's
|
||||
`gcs_telemetry_evaluator.py` keeps four independent evaluator
|
||||
functions in one module — defensible because they all consume
|
||||
the same tlog stream and the same FDR record family; splitting
|
||||
would just shuffle imports.
|
||||
* **Naming consistency**: timestamps end in `_us` or `_ms` everywhere;
|
||||
distances end in `_m`; rates end in `_hz`; coordinates end in
|
||||
`_deg`. Units in the name, not in comments. ✓
|
||||
* **Test AAA pattern**: spot-checked 10 random tests across all three
|
||||
batches. All use `# Arrange / # Act / # Assert` comments per the
|
||||
coding rules. ✓
|
||||
* **No comment narration of mechanics**: only docstrings + one-liners
|
||||
on non-obvious invariants (`STATIONARY_Z_ACCEL_MG` gravity encoding
|
||||
in b80; the greedy-pairing "keep moving forward to find the last
|
||||
pre-hint" in b81). ✓
|
||||
|
||||
### Phase 4 — Security Quick-Scan (cumulative)
|
||||
|
||||
* No SQL, no `shell=True`, no `eval`/`exec`. ✓
|
||||
* No hardcoded secrets/API keys. ✓
|
||||
* `subprocess.run` in `builder.py`'s `run_gps_denied_replay` uses a
|
||||
list-form `cmd`, not shell-interpolation. ✓
|
||||
* Tlog/FDR record parsing wraps `int()` / `float()` calls with
|
||||
ValueError re-raise messaging on the offending payload (b81's
|
||||
`parse_reloc_payload`, b80's `ImuCsvTlog`). ✓
|
||||
|
||||
### Phase 5 — Performance (cumulative)
|
||||
|
||||
* `build_fixtures` is bound by subprocess (`run_gps_denied_replay`)
|
||||
wall-clock, not in-process compute. Per-strategy materialisation
|
||||
is O(n) over input rows/frames. ✓
|
||||
* `gcs_telemetry_evaluator` is all O(n) over messages with a single
|
||||
O(n log n) ack sort. ✓
|
||||
* No new blocking I/O introduced. No new `time.sleep` polling loops. ✓
|
||||
|
||||
### Phase 6 — Cross-Task Consistency
|
||||
|
||||
* **Interface compatibility across batches**: b80 removed
|
||||
`_common.py`; b81 doesn't reference it. The only b81-side import
|
||||
from the fixture-builder area is the implicit fixture file path
|
||||
pattern, which is unchanged. No drift. ✓
|
||||
* **Duplicate symbols**: `compute_gcs_summary_rate` (b81) and
|
||||
`compute_gps_input_rate` (pre-existing
|
||||
`ap_contract_evaluator`) compute the same `(N-1)/window`
|
||||
rate over different message types — same algorithm, different
|
||||
AC. The fact they're separate functions is correct (they have
|
||||
different domain semantics). If a third "rate over message
|
||||
type X" appears, then a generic helper would be warranted. ✓
|
||||
* **Shared logging / config**: no batch introduced a new logging or
|
||||
config loader; the existing `runner.helpers.replay_mode` env-var
|
||||
helpers are reused. ✓
|
||||
|
||||
### Phase 7 — Architecture Compliance
|
||||
|
||||
`_docs/02_document/module-layout.md` declares Blackbox Tests as the
|
||||
sole owner of `e2e/**`.
|
||||
|
||||
1. **Layer direction**: all imports across the 23 changed files
|
||||
resolve within `e2e/` (`runner.helpers.*`, `e2e.fixtures.*`,
|
||||
stdlib, pytest, mavlink/pymavlink, cv2 in the b80 builder). No
|
||||
imports of `src/gps_denied_onboard.*`. ✓
|
||||
2. **Public API respect**: all cross-helper imports go through
|
||||
module-level public names; no `_private` symbol crossed a module
|
||||
boundary. ✓
|
||||
3. **No new cyclic deps**: b80's `builder.py` is a leaf consumed by
|
||||
`build_p01_fixtures.py` and `build_p02_fixtures.py`; b81's
|
||||
`gcs_telemetry_evaluator.py` is a leaf consumed by the two
|
||||
scenario tests. No back-edges. ✓
|
||||
4. **No duplicate symbols across components**: there is only one
|
||||
component touched (Blackbox Tests), so this check is vacuous
|
||||
except internally — covered by Phase 6.
|
||||
5. **Cross-cutting concerns**: haversine math (b81) is local. The
|
||||
project does not yet have a shared geo-math module; if a second
|
||||
consumer appears (e.g., spoof-distance check in FT-N-04), promote
|
||||
to `runner/helpers/geo_math.py`. Tracked here, not flagged. ✓
|
||||
|
||||
## Findings
|
||||
|
||||
(None new at cumulative scope.)
|
||||
|
||||
The two Low/Note findings from `batch_81_review.md`
|
||||
(`HintAckReport.passes` empty-hints semantic; FDR record loop branch
|
||||
ordering) remain as documented; both are scoped to b81 alone and
|
||||
neither is exacerbated by viewing b79/b80/b81 jointly. Carry over
|
||||
to the next cumulative cycle if a future batch touches the same
|
||||
files.
|
||||
|
||||
## Regression Gate (cumulative)
|
||||
|
||||
Full `e2e/_unit_tests/` suite: **746 passing in 147.57 s**.
|
||||
|
||||
Baseline history across the arc:
|
||||
|
||||
| Cumulative window | Suite count | Δ |
|
||||
|-------------------|-------------|---|
|
||||
| Pre-batch-79 | ~625 | |
|
||||
| Post-batch-79 | 686 | +61 (FT-P-02 builder scenario tests) |
|
||||
| Post-batch-80 | 700 | +14 (strategy reorganisation; finer-grained per-strategy tests) |
|
||||
| Post-batch-81 | 746 | +46 (FT-P-12/13 unit + scenario fixtures) |
|
||||
|
||||
No tests removed without an equivalent replacement
|
||||
(`test_common_module_exports_used_by_b01` retired in b80 because
|
||||
the "shared helper" pattern it pinned is now structural rather than
|
||||
file-location-based). No flakes observed across the three full-suite
|
||||
runs that closed each batch.
|
||||
@@ -12,9 +12,9 @@ sub_step:
|
||||
retry_count: 0
|
||||
cycle: 1
|
||||
tracker: jira
|
||||
last_completed_batch: 80
|
||||
last_cumulative_review: batches_76-78
|
||||
current_batch: 81
|
||||
last_completed_batch: 81
|
||||
last_cumulative_review: batches_79-81
|
||||
current_batch: 82
|
||||
|
||||
last_step_outcomes:
|
||||
step_8: "Code is testable — no changes needed (testability_assessment.md committed; no list-of-changes, no source edits)"
|
||||
|
||||
@@ -0,0 +1,549 @@
|
||||
"""Unit tests for ``runner.helpers.gcs_telemetry_evaluator`` (AZ-420).
|
||||
|
||||
The pure-logic AC-6.1 / AC-6.2 coverage scenarios for FT-P-12 + FT-P-13.
|
||||
The full e2e scenarios in ``e2e/tests/positive/test_ft_p_1[23]_*.py``
|
||||
exercise the same helpers end-to-end when ``E2E_SITL_REPLAY_DIR`` is
|
||||
prepared; this file covers the helpers in isolation so AC verification
|
||||
does not depend on the SITL fixture.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
|
||||
import pytest
|
||||
|
||||
from runner.helpers import gcs_telemetry_evaluator as gte
|
||||
from runner.helpers.mavproxy_tlog_reader import TlogMessage
|
||||
|
||||
|
||||
def _gpi(timestamp_us: int) -> TlogMessage:
|
||||
"""Construct a minimal GLOBAL_POSITION_INT TlogMessage for tests."""
|
||||
return TlogMessage(
|
||||
timestamp_us=timestamp_us,
|
||||
msg_type="GLOBAL_POSITION_INT",
|
||||
signed=True,
|
||||
fields={"lat": 0, "lon": 0, "alt": 0},
|
||||
)
|
||||
|
||||
|
||||
def _nvf(timestamp_us: int) -> TlogMessage:
|
||||
return TlogMessage(
|
||||
timestamp_us=timestamp_us,
|
||||
msg_type="NAMED_VALUE_FLOAT",
|
||||
signed=True,
|
||||
fields={"name": b"horiz_m", "value": 7.5},
|
||||
)
|
||||
|
||||
|
||||
def _statustext(timestamp_us: int, text: str) -> TlogMessage:
|
||||
return TlogMessage(
|
||||
timestamp_us=timestamp_us,
|
||||
msg_type="STATUSTEXT",
|
||||
signed=False,
|
||||
fields={"severity": 4, "text": text},
|
||||
)
|
||||
|
||||
|
||||
# ─────────────────── compute_gcs_summary_rate ───────────────────
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_passes_within_band() -> None:
|
||||
# Arrange: 60 GLOBAL_POSITION_INT at 1.5 Hz over 60 s.
|
||||
interval_us = int(1_000_000 / 1.5)
|
||||
msgs = [_gpi(i * interval_us) for i in range(91)]
|
||||
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate(msgs)
|
||||
|
||||
# Assert
|
||||
assert math.isclose(report.observed_rate_hz, 1.5, abs_tol=1e-3)
|
||||
assert report.total_summary_messages == 91
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_fails_below_band() -> None:
|
||||
# Arrange: 0.5 Hz cadence over 60 s.
|
||||
interval_us = 2_000_000
|
||||
msgs = [_gpi(i * interval_us) for i in range(31)]
|
||||
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate(msgs)
|
||||
|
||||
# Assert
|
||||
assert math.isclose(report.observed_rate_hz, 0.5, abs_tol=1e-3)
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_fails_above_band() -> None:
|
||||
# Arrange: 5 Hz cadence (matches the un-downsampled emit_summary).
|
||||
interval_us = 200_000
|
||||
msgs = [_gpi(i * interval_us) for i in range(301)]
|
||||
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate(msgs)
|
||||
|
||||
# Assert
|
||||
assert math.isclose(report.observed_rate_hz, 5.0, abs_tol=1e-3)
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_ignores_companion_named_value_float() -> None:
|
||||
# Arrange: interleave NAMED_VALUE_FLOAT companions; they MUST NOT be
|
||||
# counted as separate summary bursts (avoids double-counting).
|
||||
interval_us = int(1_000_000 / 1.5)
|
||||
msgs = [_gpi(i * interval_us) for i in range(91)]
|
||||
msgs.extend(_nvf(i * interval_us + 1) for i in range(91))
|
||||
msgs.sort(key=lambda m: m.timestamp_us)
|
||||
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate(msgs)
|
||||
|
||||
# Assert
|
||||
assert report.total_summary_messages == 91
|
||||
assert math.isclose(report.observed_rate_hz, 1.5, abs_tol=1e-3)
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_handles_empty_input() -> None:
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate([])
|
||||
|
||||
# Assert
|
||||
assert report.total_summary_messages == 0
|
||||
assert report.window_us == 0
|
||||
assert report.observed_rate_hz == 0.0
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_handles_single_message() -> None:
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate([_gpi(0)])
|
||||
|
||||
# Assert
|
||||
assert report.total_summary_messages == 1
|
||||
assert report.window_us == 0
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_rejects_negative_min_hz() -> None:
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="min_required_hz must be ≥0"):
|
||||
gte.compute_gcs_summary_rate([_gpi(0)], min_required_hz=-1.0)
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_rejects_inverted_band() -> None:
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="max_required_hz"):
|
||||
gte.compute_gcs_summary_rate([_gpi(0)], min_required_hz=2.0, max_required_hz=1.0)
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_accepts_boundary_min() -> None:
|
||||
# Arrange: exactly 1 Hz.
|
||||
msgs = [_gpi(i * 1_000_000) for i in range(11)]
|
||||
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate(msgs)
|
||||
|
||||
# Assert
|
||||
assert math.isclose(report.observed_rate_hz, 1.0, abs_tol=1e-6)
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_accepts_boundary_max() -> None:
|
||||
# Arrange: exactly 2 Hz.
|
||||
msgs = [_gpi(i * 500_000) for i in range(21)]
|
||||
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate(msgs)
|
||||
|
||||
# Assert
|
||||
assert math.isclose(report.observed_rate_hz, 2.0, abs_tol=1e-6)
|
||||
assert report.passes
|
||||
|
||||
|
||||
# ─────────────────── extract_inbound_hints ───────────────────
|
||||
|
||||
|
||||
def test_extract_inbound_hints_finds_reloc_prefix() -> None:
|
||||
# Arrange
|
||||
msgs = [
|
||||
_statustext(1_000_000, "RELOC:50.0,36.0,200"),
|
||||
_statustext(2_000_000, "EKF position alert"),
|
||||
_statustext(3_000_000, "RELOC:50.1,36.1,250"),
|
||||
]
|
||||
|
||||
# Act
|
||||
hints = gte.extract_inbound_hints(msgs)
|
||||
|
||||
# Assert
|
||||
assert [h.inject_timestamp_us for h in hints] == [1_000_000, 3_000_000]
|
||||
assert hints[0].hint_text == "RELOC:50.0,36.0,200"
|
||||
|
||||
|
||||
def test_extract_inbound_hints_ignores_non_statustext() -> None:
|
||||
# Arrange
|
||||
msgs = [_gpi(0), _nvf(1_000_000), _statustext(2_000_000, "RELOC:1,2,3")]
|
||||
|
||||
# Act
|
||||
hints = gte.extract_inbound_hints(msgs)
|
||||
|
||||
# Assert
|
||||
assert len(hints) == 1
|
||||
assert hints[0].inject_timestamp_us == 2_000_000
|
||||
|
||||
|
||||
def test_extract_inbound_hints_honors_custom_prefix() -> None:
|
||||
# Arrange
|
||||
msgs = [
|
||||
_statustext(1_000_000, "HINT:50,36,200"),
|
||||
_statustext(2_000_000, "RELOC:50,36,200"),
|
||||
]
|
||||
|
||||
# Act
|
||||
hints = gte.extract_inbound_hints(msgs, hint_prefix="HINT:")
|
||||
|
||||
# Assert
|
||||
assert len(hints) == 1
|
||||
assert hints[0].hint_text == "HINT:50,36,200"
|
||||
|
||||
|
||||
# ─────────────────── parse_reloc_payload ───────────────────
|
||||
|
||||
|
||||
def test_parse_reloc_payload_returns_triplet() -> None:
|
||||
# Assert
|
||||
assert gte.parse_reloc_payload("RELOC:50.0,36.0,200.5") == (50.0, 36.0, 200.5)
|
||||
|
||||
|
||||
def test_parse_reloc_payload_rejects_wrong_prefix() -> None:
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="does not start with 'RELOC:'"):
|
||||
gte.parse_reloc_payload("HINT:50,36,200")
|
||||
|
||||
|
||||
def test_parse_reloc_payload_rejects_wrong_field_count() -> None:
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="3 comma-separated fields"):
|
||||
gte.parse_reloc_payload("RELOC:50,36")
|
||||
|
||||
|
||||
def test_parse_reloc_payload_rejects_non_float_fields() -> None:
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="must be floats"):
|
||||
gte.parse_reloc_payload("RELOC:north,east,200")
|
||||
|
||||
|
||||
# ─────────────────── correlate_hint_acks ───────────────────
|
||||
|
||||
|
||||
def test_correlate_hint_acks_pairs_in_injection_order() -> None:
|
||||
# Arrange
|
||||
hints = (
|
||||
gte.InboundHint(inject_timestamp_us=1_000_000, hint_text="RELOC:50,36,200"),
|
||||
gte.InboundHint(inject_timestamp_us=5_000_000, hint_text="RELOC:51,37,200"),
|
||||
)
|
||||
acks = (
|
||||
gte.FdrCommandAck(ack_timestamp_us=2_500_000, payload_kv={"command": "STATUSTEXT", "i": 0}),
|
||||
gte.FdrCommandAck(ack_timestamp_us=6_500_000, payload_kv={"command": "STATUSTEXT", "i": 1}),
|
||||
)
|
||||
|
||||
# Act
|
||||
report = gte.correlate_hint_acks(hints, acks)
|
||||
|
||||
# Assert
|
||||
assert report.acked_count == 2
|
||||
assert report.latencies_ms == (1500.0, 1500.0)
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_correlate_hint_acks_marks_missing_ack_as_none() -> None:
|
||||
# Arrange
|
||||
hints = (gte.InboundHint(inject_timestamp_us=1_000_000, hint_text="RELOC:1,2,3"),)
|
||||
acks: tuple[gte.FdrCommandAck, ...] = ()
|
||||
|
||||
# Act
|
||||
report = gte.correlate_hint_acks(hints, acks)
|
||||
|
||||
# Assert
|
||||
assert report.acked_count == 0
|
||||
assert report.latencies_ms == (None,)
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_correlate_hint_acks_fails_when_latency_exceeds_budget() -> None:
|
||||
# Arrange: 2.5 s latency vs 2.0 s budget.
|
||||
hints = (gte.InboundHint(inject_timestamp_us=1_000_000, hint_text="RELOC:1,2,3"),)
|
||||
acks = (gte.FdrCommandAck(ack_timestamp_us=3_500_000, payload_kv={"command": "STATUSTEXT"}),)
|
||||
|
||||
# Act
|
||||
report = gte.correlate_hint_acks(hints, acks)
|
||||
|
||||
# Assert
|
||||
assert report.acked_count == 1
|
||||
assert report.latencies_ms == (2500.0,)
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_correlate_hint_acks_ignores_pre_hint_acks() -> None:
|
||||
# Arrange
|
||||
hints = (gte.InboundHint(inject_timestamp_us=5_000_000, hint_text="RELOC:1,2,3"),)
|
||||
acks = (
|
||||
gte.FdrCommandAck(ack_timestamp_us=1_000_000, payload_kv={"command": "STATUSTEXT"}),
|
||||
gte.FdrCommandAck(ack_timestamp_us=6_000_000, payload_kv={"command": "STATUSTEXT"}),
|
||||
)
|
||||
|
||||
# Act
|
||||
report = gte.correlate_hint_acks(hints, acks)
|
||||
|
||||
# Assert
|
||||
assert report.acked_count == 1
|
||||
assert report.latencies_ms == (1000.0,)
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_correlate_hint_acks_each_ack_matches_only_once() -> None:
|
||||
# Arrange: two hints, one ack — second hint must show as unacked.
|
||||
hints = (
|
||||
gte.InboundHint(inject_timestamp_us=1_000_000, hint_text="RELOC:1,2,3"),
|
||||
gte.InboundHint(inject_timestamp_us=2_000_000, hint_text="RELOC:1,2,3"),
|
||||
)
|
||||
acks = (gte.FdrCommandAck(ack_timestamp_us=1_500_000, payload_kv={"command": "STATUSTEXT"}),)
|
||||
|
||||
# Act
|
||||
report = gte.correlate_hint_acks(hints, acks)
|
||||
|
||||
# Assert
|
||||
assert report.latencies_ms == (500.0, None)
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_correlate_hint_acks_handles_no_hints() -> None:
|
||||
# Act
|
||||
report = gte.correlate_hint_acks((), ())
|
||||
|
||||
# Assert
|
||||
assert report.latencies_ms == ()
|
||||
assert not report.passes # no hints injected → can't certify AC-2
|
||||
|
||||
|
||||
# ─────────────────── haversine_distance_m ───────────────────
|
||||
|
||||
|
||||
def test_haversine_distance_m_is_zero_for_same_point() -> None:
|
||||
# Assert
|
||||
assert gte.haversine_distance_m(50.0, 36.0, 50.0, 36.0) == pytest.approx(0.0, abs=1e-6)
|
||||
|
||||
|
||||
def test_haversine_distance_m_known_baseline() -> None:
|
||||
# Arrange: ~1 deg of latitude near the equator ≈ 111.195 km on a
|
||||
# spherical earth with mean radius 6_371_008.8 m.
|
||||
expected_m = math.radians(1.0) * 6_371_008.8
|
||||
|
||||
# Act
|
||||
distance = gte.haversine_distance_m(0.0, 0.0, 1.0, 0.0)
|
||||
|
||||
# Assert
|
||||
assert distance == pytest.approx(expected_m, rel=1e-6)
|
||||
|
||||
|
||||
def test_haversine_distance_m_is_symmetric() -> None:
|
||||
# Arrange
|
||||
a = (50.0, 36.0)
|
||||
b = (50.5, 36.5)
|
||||
|
||||
# Act
|
||||
d_ab = gte.haversine_distance_m(*a, *b)
|
||||
d_ba = gte.haversine_distance_m(*b, *a)
|
||||
|
||||
# Assert
|
||||
assert d_ab == pytest.approx(d_ba, rel=1e-9)
|
||||
|
||||
|
||||
# ─────────────────── evaluate_search_region_shift ───────────────────
|
||||
|
||||
|
||||
def _region(monotonic_us: int, lat: float, lon: float, radius_m: float = 100.0) -> gte.SearchRegionRecord:
|
||||
return gte.SearchRegionRecord(
|
||||
monotonic_us=monotonic_us, centre_lat_deg=lat, centre_lon_deg=lon, radius_m=radius_m
|
||||
)
|
||||
|
||||
|
||||
def test_evaluate_search_region_shift_passes_when_post_moves_closer() -> None:
|
||||
# Arrange: hint at (50.0, 36.0); pre-region was 1 km north; post is 200 m north.
|
||||
pre = _region(1_000_000, 50.01, 36.0) # ~1.1 km from hint
|
||||
post = _region(3_000_000, 50.002, 36.0) # ~222 m from hint
|
||||
regions = [pre, post]
|
||||
|
||||
# Act
|
||||
report = gte.evaluate_search_region_shift(
|
||||
regions, hint_inject_timestamp_us=2_000_000, hint_lat_deg=50.0, hint_lon_deg=36.0
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert report.region_before is pre
|
||||
assert report.region_after is post
|
||||
assert report.distance_before_m is not None and report.distance_after_m is not None
|
||||
assert report.distance_after_m < report.distance_before_m
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_evaluate_search_region_shift_fails_when_post_moves_further() -> None:
|
||||
# Arrange
|
||||
pre = _region(1_000_000, 50.001, 36.0)
|
||||
post = _region(3_000_000, 50.01, 36.0)
|
||||
regions = [pre, post]
|
||||
|
||||
# Act
|
||||
report = gte.evaluate_search_region_shift(
|
||||
regions, hint_inject_timestamp_us=2_000_000, hint_lat_deg=50.0, hint_lon_deg=36.0
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_evaluate_search_region_shift_passes_when_no_pre_region() -> None:
|
||||
# Arrange: no pre-hint region — any post-hint region counts as a pass.
|
||||
post = _region(3_000_000, 50.0, 36.0)
|
||||
regions = [post]
|
||||
|
||||
# Act
|
||||
report = gte.evaluate_search_region_shift(
|
||||
regions, hint_inject_timestamp_us=2_000_000, hint_lat_deg=50.0, hint_lon_deg=36.0
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert report.region_before is None
|
||||
assert report.region_after is post
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_evaluate_search_region_shift_fails_when_no_post_region() -> None:
|
||||
# Arrange
|
||||
pre = _region(1_000_000, 50.0, 36.0)
|
||||
regions = [pre]
|
||||
|
||||
# Act
|
||||
report = gte.evaluate_search_region_shift(
|
||||
regions, hint_inject_timestamp_us=2_000_000, hint_lat_deg=50.0, hint_lon_deg=36.0
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert report.region_after is None
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_evaluate_search_region_shift_keeps_latest_pre_region() -> None:
|
||||
# Arrange: three pre-hint regions; the LAST one is the relevant baseline.
|
||||
far = _region(500_000, 50.05, 36.0)
|
||||
close = _region(1_500_000, 50.005, 36.0)
|
||||
post = _region(3_000_000, 50.002, 36.0)
|
||||
regions = [far, close, post]
|
||||
|
||||
# Act
|
||||
report = gte.evaluate_search_region_shift(
|
||||
regions, hint_inject_timestamp_us=2_000_000, hint_lat_deg=50.0, hint_lon_deg=36.0
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert report.region_before is close
|
||||
# The "before → after" delta must be measured against `close`, not `far`.
|
||||
expected_pre_dist = gte.haversine_distance_m(50.005, 36.0, 50.0, 36.0)
|
||||
assert report.distance_before_m == pytest.approx(expected_pre_dist, rel=1e-9)
|
||||
|
||||
|
||||
# ─────────────────── detect_hint_rejection ───────────────────
|
||||
|
||||
|
||||
def test_detect_hint_rejection_finds_bad_signature() -> None:
|
||||
# Arrange
|
||||
msgs = [_statustext(2_500_000, "BAD_SIGNATURE on hint accept path")]
|
||||
|
||||
# Act
|
||||
report = gte.detect_hint_rejection(msgs, inject_timestamp_us=2_000_000)
|
||||
|
||||
# Assert
|
||||
assert report.rejection_count == 1
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_detect_hint_rejection_ignores_pre_window_rejections() -> None:
|
||||
# Arrange
|
||||
msgs = [_statustext(1_000_000, "BAD_SIGNATURE")]
|
||||
|
||||
# Act
|
||||
report = gte.detect_hint_rejection(msgs, inject_timestamp_us=2_000_000)
|
||||
|
||||
# Assert
|
||||
assert report.rejection_count == 0
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_detect_hint_rejection_ignores_post_window_rejections() -> None:
|
||||
# Arrange: window default 2_000_000 us → ends at 4_000_000 us.
|
||||
msgs = [_statustext(5_000_000, "REJECTED hint")]
|
||||
|
||||
# Act
|
||||
report = gte.detect_hint_rejection(msgs, inject_timestamp_us=2_000_000)
|
||||
|
||||
# Assert
|
||||
assert report.rejection_count == 0
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_detect_hint_rejection_passes_on_unrelated_statustext() -> None:
|
||||
# Arrange
|
||||
msgs = [_statustext(2_500_000, "EKF position OK")]
|
||||
|
||||
# Act
|
||||
report = gte.detect_hint_rejection(msgs, inject_timestamp_us=2_000_000)
|
||||
|
||||
# Assert
|
||||
assert report.rejection_count == 0
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_detect_hint_rejection_is_case_insensitive() -> None:
|
||||
# Arrange
|
||||
msgs = [_statustext(2_500_000, "bad_signature on hint accept path")]
|
||||
|
||||
# Act
|
||||
report = gte.detect_hint_rejection(msgs, inject_timestamp_us=2_000_000)
|
||||
|
||||
# Assert
|
||||
assert report.rejection_count == 1
|
||||
|
||||
|
||||
def test_detect_hint_rejection_records_full_text() -> None:
|
||||
# Arrange: rejection text is preserved with its original case for debugging.
|
||||
msgs = [_statustext(2_500_000, "UNAUTHORIZED hint from operator X")]
|
||||
|
||||
# Act
|
||||
report = gte.detect_hint_rejection(msgs, inject_timestamp_us=2_000_000)
|
||||
|
||||
# Assert
|
||||
assert report.rejection_texts == ("UNAUTHORIZED hint from operator X",)
|
||||
|
||||
|
||||
def test_detect_hint_rejection_rejects_non_positive_window() -> None:
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="window_us must be > 0"):
|
||||
gte.detect_hint_rejection([], inject_timestamp_us=0, window_us=0)
|
||||
|
||||
|
||||
# ─────────────────── collect_messages_to_list ───────────────────
|
||||
|
||||
|
||||
def test_collect_messages_to_list_materialises_iterator() -> None:
|
||||
# Arrange
|
||||
def _gen():
|
||||
yield _gpi(0)
|
||||
yield _gpi(1)
|
||||
|
||||
# Act
|
||||
materialised = gte.collect_messages_to_list(_gen())
|
||||
|
||||
# Assert
|
||||
assert len(materialised) == 2
|
||||
assert all(isinstance(m, TlogMessage) for m in materialised)
|
||||
@@ -473,6 +473,39 @@ def test_capture_ap_tlog_zero_duration_raises():
|
||||
so.capture_ap_tlog(host="x", duration_s=0)
|
||||
|
||||
|
||||
# capture_gcs_tlog
|
||||
|
||||
|
||||
def test_capture_gcs_tlog_missing_env_raises(unset_replay_dir):
|
||||
# Assert
|
||||
with pytest.raises(RuntimeError, match="env var not set"):
|
||||
so.capture_gcs_tlog(host="sitl-ardupilot", duration_s=1.0)
|
||||
|
||||
|
||||
def test_capture_gcs_tlog_missing_file_raises(replay_dir: Path):
|
||||
# Assert
|
||||
with pytest.raises(RuntimeError, match="fixture not found"):
|
||||
so.capture_gcs_tlog(host="sitl-ardupilot", duration_s=1.0)
|
||||
|
||||
|
||||
def test_capture_gcs_tlog_returns_path(replay_dir: Path):
|
||||
# Arrange
|
||||
tlog = replay_dir / "gcs_tlog_sitl-ardupilot.tlog"
|
||||
tlog.write_bytes(b"\x00\x01\x02")
|
||||
|
||||
# Act
|
||||
out = so.capture_gcs_tlog(host="sitl-ardupilot", duration_s=1.0)
|
||||
|
||||
# Assert
|
||||
assert out == tlog
|
||||
|
||||
|
||||
def test_capture_gcs_tlog_zero_duration_raises():
|
||||
# Assert
|
||||
with pytest.raises(RuntimeError, match="duration_s must be positive"):
|
||||
so.capture_gcs_tlog(host="x", duration_s=0)
|
||||
|
||||
|
||||
# read_ap_parameter
|
||||
|
||||
|
||||
|
||||
@@ -51,6 +51,7 @@ E2E_ROOT = Path(__file__).resolve().parents[1]
|
||||
"runner/helpers/sharp_turn_detector.py",
|
||||
"runner/helpers/msp_frame_observer.py",
|
||||
"runner/helpers/ap_contract_evaluator.py",
|
||||
"runner/helpers/gcs_telemetry_evaluator.py",
|
||||
"runner/helpers/cold_start_evaluator.py",
|
||||
"runner/helpers/outlier_tolerance_evaluator.py",
|
||||
"runner/helpers/outage_request_evaluator.py",
|
||||
@@ -106,6 +107,8 @@ E2E_ROOT = Path(__file__).resolve().parents[1]
|
||||
"tests/positive/test_ft_p_09_inav.py",
|
||||
"tests/positive/test_ft_p_10_smoothing_lookback.py",
|
||||
"tests/positive/test_ft_p_11_cold_start_init.py",
|
||||
"tests/positive/test_ft_p_12_gcs_downsample.py",
|
||||
"tests/positive/test_ft_p_13_gcs_command.py",
|
||||
"tests/negative/test_ft_n_01_outlier_tolerance.py",
|
||||
"tests/negative/test_ft_n_02_sharp_turn_failure.py",
|
||||
"tests/negative/test_ft_n_03_outage_reloc.py",
|
||||
|
||||
@@ -0,0 +1,429 @@
|
||||
"""GCS telemetry evaluation for FT-P-12 + FT-P-13 (AZ-420 / AC-6.1, AC-6.2).
|
||||
|
||||
Two evaluators sourced from the GCS-side ``.tlog`` captured by
|
||||
``mavproxy-listener`` plus the FDR archive:
|
||||
|
||||
* **FT-P-12 / AC-6.1**: SUT→GCS summary cadence must land in [1, 2] Hz
|
||||
over the 60 s replay window. The SUT's C8 ``QgcTelemetryAdapter`` pairs
|
||||
``GLOBAL_POSITION_INT`` + ``NAMED_VALUE_FLOAT`` at the configured
|
||||
``summary_rate_hz``; we count ``GLOBAL_POSITION_INT`` bursts since the
|
||||
``NAMED_VALUE_FLOAT`` companion is decorative.
|
||||
* **FT-P-13 / AC-6.2**: GCS-originated ``STATUSTEXT`` carrying an operator
|
||||
re-loc hint:
|
||||
* acknowledgement latency from inject → FDR ``c8.gcs.operator_command``
|
||||
record must be ≤ 2 s (AC-2);
|
||||
* the next per-frame ``anchor_search_region`` FDR record's centre must
|
||||
move closer to the hinted location than the last pre-hint region
|
||||
(AC-3);
|
||||
* no ``BAD_SIGNATURE`` / ``UNAUTHORIZED`` STATUSTEXT may appear in the
|
||||
rejection window after the hint (AC-4).
|
||||
|
||||
All inputs are pure iterables / sequences. The tlog ingestion is
|
||||
delegated to ``runner.helpers.mavproxy_tlog_reader.iter_messages`` and
|
||||
the FDR ingestion to ``runner.helpers.fdr_reader.iter_records``.
|
||||
|
||||
Public-boundary discipline: this module does NOT import any
|
||||
``src/gps_denied_onboard`` symbol.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable, Sequence
|
||||
|
||||
from .mavproxy_tlog_reader import TlogMessage
|
||||
|
||||
GCS_SUMMARY_RATE_MIN_HZ = 1.0
|
||||
GCS_SUMMARY_RATE_MAX_HZ = 2.0
|
||||
GCS_SUMMARY_POSITION_MSG_TYPE = "GLOBAL_POSITION_INT"
|
||||
GCS_SUMMARY_COMPANION_MSG_TYPE = "NAMED_VALUE_FLOAT"
|
||||
|
||||
HINT_ACK_MAX_LATENCY_MS = 2000.0
|
||||
HINT_FDR_KIND = "c8.gcs.operator_command"
|
||||
HINT_REJECTION_STATUSTEXT_TOKENS = ("BAD_SIGNATURE", "UNAUTHORIZED", "REJECTED")
|
||||
|
||||
ANCHOR_SEARCH_REGION_FDR_KIND = "anchor_search_region"
|
||||
|
||||
_EARTH_RADIUS_M = 6_371_008.8
|
||||
|
||||
|
||||
# ─────────────────────── FT-P-12 / AC-6.1 ───────────────────────
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GcsSummaryRateReport:
|
||||
"""AC-6.1: SUT→GCS summary cadence over the replay window."""
|
||||
|
||||
total_summary_messages: int
|
||||
window_us: int
|
||||
observed_rate_hz: float
|
||||
min_required_hz: float = GCS_SUMMARY_RATE_MIN_HZ
|
||||
max_required_hz: float = GCS_SUMMARY_RATE_MAX_HZ
|
||||
|
||||
@property
|
||||
def passes(self) -> bool:
|
||||
if self.window_us <= 0:
|
||||
return False
|
||||
return self.min_required_hz <= self.observed_rate_hz <= self.max_required_hz
|
||||
|
||||
|
||||
def compute_gcs_summary_rate(
|
||||
messages: Iterable[TlogMessage],
|
||||
*,
|
||||
position_msg_type: str = GCS_SUMMARY_POSITION_MSG_TYPE,
|
||||
min_required_hz: float = GCS_SUMMARY_RATE_MIN_HZ,
|
||||
max_required_hz: float = GCS_SUMMARY_RATE_MAX_HZ,
|
||||
) -> GcsSummaryRateReport:
|
||||
"""AC-6.1: rate of ``GLOBAL_POSITION_INT`` messages emitted to the GCS.
|
||||
|
||||
Each SUT→GCS summary "burst" is one ``GLOBAL_POSITION_INT`` paired
|
||||
with one ``NAMED_VALUE_FLOAT(horiz_m)`` per the C8 ``QgcTelemetryAdapter``
|
||||
implementation; only the position message is counted to avoid
|
||||
double-counting the decorative companion.
|
||||
|
||||
Rate is computed over the (first, last) timestamp span — i.e.,
|
||||
``(N-1) / window_seconds`` — to match ``compute_gps_input_rate`` in
|
||||
``ap_contract_evaluator``.
|
||||
"""
|
||||
if min_required_hz < 0:
|
||||
raise ValueError(f"min_required_hz must be ≥0, got {min_required_hz}")
|
||||
if max_required_hz < min_required_hz:
|
||||
raise ValueError(
|
||||
f"max_required_hz ({max_required_hz}) must be ≥ "
|
||||
f"min_required_hz ({min_required_hz})"
|
||||
)
|
||||
|
||||
timestamps = [m.timestamp_us for m in messages if m.msg_type == position_msg_type]
|
||||
if len(timestamps) < 2:
|
||||
return GcsSummaryRateReport(
|
||||
total_summary_messages=len(timestamps),
|
||||
window_us=0,
|
||||
observed_rate_hz=0.0,
|
||||
min_required_hz=min_required_hz,
|
||||
max_required_hz=max_required_hz,
|
||||
)
|
||||
window_us = timestamps[-1] - timestamps[0]
|
||||
if window_us <= 0:
|
||||
return GcsSummaryRateReport(
|
||||
total_summary_messages=len(timestamps),
|
||||
window_us=window_us,
|
||||
observed_rate_hz=0.0,
|
||||
min_required_hz=min_required_hz,
|
||||
max_required_hz=max_required_hz,
|
||||
)
|
||||
observed_hz = (len(timestamps) - 1) / (window_us / 1_000_000.0)
|
||||
return GcsSummaryRateReport(
|
||||
total_summary_messages=len(timestamps),
|
||||
window_us=window_us,
|
||||
observed_rate_hz=observed_hz,
|
||||
min_required_hz=min_required_hz,
|
||||
max_required_hz=max_required_hz,
|
||||
)
|
||||
|
||||
|
||||
# ─────────────────────── FT-P-13 / AC-6.2 ───────────────────────
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class InboundHint:
|
||||
"""A GCS-originated re-loc hint observed inbound on the SUT side.
|
||||
|
||||
Sourced from a ``STATUSTEXT`` MAVLink message captured in the GCS
|
||||
tlog. ``hint_text`` is the raw payload (the operator's hint string).
|
||||
"""
|
||||
|
||||
inject_timestamp_us: int
|
||||
hint_text: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class FdrCommandAck:
|
||||
"""An FDR record acknowledging the inbound operator command.
|
||||
|
||||
Sourced from ``kind='log'`` records whose payload ``kv.kind`` equals
|
||||
``c8.gcs.operator_command`` (the kind the QGC adapter emits when it
|
||||
translates an inbound command into an ``OperatorCommand`` DTO).
|
||||
"""
|
||||
|
||||
ack_timestamp_us: int
|
||||
payload_kv: dict
|
||||
|
||||
|
||||
def correlate_hint_acks(
|
||||
hints: Sequence[InboundHint],
|
||||
acks: Sequence[FdrCommandAck],
|
||||
) -> "HintAckReport":
|
||||
"""AC-6.2 / AC-2: pair each hint with its earliest succeeding ack.
|
||||
|
||||
Pairing is greedy in injection order. A given FDR ack can match at
|
||||
most one hint; an ack whose timestamp precedes every hint is
|
||||
ignored (it cannot be an ack for those hints).
|
||||
"""
|
||||
sorted_acks = sorted(acks, key=lambda a: a.ack_timestamp_us)
|
||||
cursor = 0
|
||||
pairs: list[tuple[InboundHint, FdrCommandAck | None]] = []
|
||||
for hint in hints:
|
||||
match: FdrCommandAck | None = None
|
||||
while cursor < len(sorted_acks):
|
||||
ack = sorted_acks[cursor]
|
||||
if ack.ack_timestamp_us < hint.inject_timestamp_us:
|
||||
cursor += 1
|
||||
continue
|
||||
match = ack
|
||||
cursor += 1
|
||||
break
|
||||
pairs.append((hint, match))
|
||||
latencies: list[float | None] = []
|
||||
for hint, ack in pairs:
|
||||
if ack is None:
|
||||
latencies.append(None)
|
||||
else:
|
||||
latencies.append((ack.ack_timestamp_us - hint.inject_timestamp_us) / 1000.0)
|
||||
return HintAckReport(
|
||||
hints=tuple(hints),
|
||||
acks=tuple(sorted_acks),
|
||||
latencies_ms=tuple(latencies),
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class HintAckReport:
|
||||
"""AC-2 of FT-P-13: per-hint inject→ack latency."""
|
||||
|
||||
hints: tuple[InboundHint, ...]
|
||||
acks: tuple[FdrCommandAck, ...]
|
||||
latencies_ms: tuple[float | None, ...]
|
||||
max_required_ms: float = HINT_ACK_MAX_LATENCY_MS
|
||||
|
||||
@property
|
||||
def acked_count(self) -> int:
|
||||
return sum(1 for latency in self.latencies_ms if latency is not None)
|
||||
|
||||
@property
|
||||
def passes(self) -> bool:
|
||||
if not self.hints:
|
||||
return False
|
||||
return all(
|
||||
latency is not None and latency <= self.max_required_ms
|
||||
for latency in self.latencies_ms
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SearchRegionRecord:
|
||||
"""One ``anchor_search_region`` FDR record.
|
||||
|
||||
Schema (AC-NEW-3 family): per-frame record of the satellite-anchor
|
||||
search region the C2 backbone is currently scanning. Centre is in
|
||||
WGS84 degrees; radius is in metres.
|
||||
"""
|
||||
|
||||
monotonic_us: int
|
||||
centre_lat_deg: float
|
||||
centre_lon_deg: float
|
||||
radius_m: float
|
||||
|
||||
|
||||
def haversine_distance_m(
|
||||
lat_a_deg: float, lon_a_deg: float, lat_b_deg: float, lon_b_deg: float
|
||||
) -> float:
|
||||
"""Great-circle distance between two WGS84 points in metres.
|
||||
|
||||
Uses the spherical haversine formula with the mean Earth radius.
|
||||
Accurate to ≪1 m for the sub-100 km separations FT-P-13 cares about.
|
||||
"""
|
||||
phi_a = math.radians(lat_a_deg)
|
||||
phi_b = math.radians(lat_b_deg)
|
||||
dphi = math.radians(lat_b_deg - lat_a_deg)
|
||||
dlam = math.radians(lon_b_deg - lon_a_deg)
|
||||
a = math.sin(dphi / 2) ** 2 + math.cos(phi_a) * math.cos(phi_b) * math.sin(dlam / 2) ** 2
|
||||
c = 2 * math.asin(min(1.0, math.sqrt(a)))
|
||||
return _EARTH_RADIUS_M * c
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SearchRegionShiftReport:
|
||||
"""AC-3 of FT-P-13: did the search region shift toward the hint?"""
|
||||
|
||||
hint_lat_deg: float
|
||||
hint_lon_deg: float
|
||||
region_before: SearchRegionRecord | None
|
||||
region_after: SearchRegionRecord | None
|
||||
distance_before_m: float | None
|
||||
distance_after_m: float | None
|
||||
|
||||
@property
|
||||
def passes(self) -> bool:
|
||||
if self.region_after is None or self.distance_after_m is None:
|
||||
return False
|
||||
if self.region_before is None or self.distance_before_m is None:
|
||||
return True
|
||||
return self.distance_after_m < self.distance_before_m
|
||||
|
||||
|
||||
def evaluate_search_region_shift(
|
||||
regions: Sequence[SearchRegionRecord],
|
||||
hint_inject_timestamp_us: int,
|
||||
hint_lat_deg: float,
|
||||
hint_lon_deg: float,
|
||||
) -> SearchRegionShiftReport:
|
||||
"""AC-3: compare the last pre-hint region to the first post-hint region.
|
||||
|
||||
The "shift toward the hint" signal is positive iff the first
|
||||
region observed AFTER ``hint_inject_timestamp_us`` is closer to
|
||||
``(hint_lat_deg, hint_lon_deg)`` than the last region observed
|
||||
BEFORE the inject. If no pre-hint region exists, any post-hint
|
||||
region counts as a pass (the bias was set before the C2 backbone
|
||||
had a chance to publish anything).
|
||||
"""
|
||||
region_before: SearchRegionRecord | None = None
|
||||
region_after: SearchRegionRecord | None = None
|
||||
for region in regions:
|
||||
if region.monotonic_us < hint_inject_timestamp_us:
|
||||
region_before = region # keep moving forward to find the last pre-hint
|
||||
elif region_after is None:
|
||||
region_after = region
|
||||
distance_before = (
|
||||
haversine_distance_m(
|
||||
region_before.centre_lat_deg,
|
||||
region_before.centre_lon_deg,
|
||||
hint_lat_deg,
|
||||
hint_lon_deg,
|
||||
)
|
||||
if region_before is not None
|
||||
else None
|
||||
)
|
||||
distance_after = (
|
||||
haversine_distance_m(
|
||||
region_after.centre_lat_deg,
|
||||
region_after.centre_lon_deg,
|
||||
hint_lat_deg,
|
||||
hint_lon_deg,
|
||||
)
|
||||
if region_after is not None
|
||||
else None
|
||||
)
|
||||
return SearchRegionShiftReport(
|
||||
hint_lat_deg=hint_lat_deg,
|
||||
hint_lon_deg=hint_lon_deg,
|
||||
region_before=region_before,
|
||||
region_after=region_after,
|
||||
distance_before_m=distance_before,
|
||||
distance_after_m=distance_after,
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class HintRejectionReport:
|
||||
"""AC-4 of FT-P-13: no security/auth rejection of the well-formed hint."""
|
||||
|
||||
inject_timestamp_us: int
|
||||
window_us: int
|
||||
rejection_count: int
|
||||
rejection_texts: tuple[str, ...]
|
||||
|
||||
@property
|
||||
def passes(self) -> bool:
|
||||
return self.rejection_count == 0
|
||||
|
||||
|
||||
def detect_hint_rejection(
|
||||
messages: Iterable[TlogMessage],
|
||||
inject_timestamp_us: int,
|
||||
*,
|
||||
window_us: int = int(HINT_ACK_MAX_LATENCY_MS * 1000.0),
|
||||
rejection_tokens: Sequence[str] = HINT_REJECTION_STATUSTEXT_TOKENS,
|
||||
) -> HintRejectionReport:
|
||||
"""AC-4: scan ``STATUSTEXT`` in the post-inject window for rejection markers.
|
||||
|
||||
A rejection is any ``STATUSTEXT`` whose payload ``text`` field (case
|
||||
insensitive) contains any of ``rejection_tokens``. The window opens
|
||||
at the inject timestamp and closes ``window_us`` later — beyond that
|
||||
a rejection cannot be causally tied to this hint.
|
||||
"""
|
||||
if window_us <= 0:
|
||||
raise ValueError(f"window_us must be > 0, got {window_us}")
|
||||
window_end = inject_timestamp_us + window_us
|
||||
tokens_upper = tuple(token.upper() for token in rejection_tokens)
|
||||
rejection_texts: list[str] = []
|
||||
for msg in messages:
|
||||
if msg.msg_type != "STATUSTEXT":
|
||||
continue
|
||||
if not (inject_timestamp_us <= msg.timestamp_us <= window_end):
|
||||
continue
|
||||
text = str(msg.fields.get("text", "")).upper()
|
||||
if any(token in text for token in tokens_upper):
|
||||
rejection_texts.append(str(msg.fields.get("text", "")))
|
||||
return HintRejectionReport(
|
||||
inject_timestamp_us=inject_timestamp_us,
|
||||
window_us=window_us,
|
||||
rejection_count=len(rejection_texts),
|
||||
rejection_texts=tuple(rejection_texts),
|
||||
)
|
||||
|
||||
|
||||
# ─────────────────────── tlog→hint adapter ───────────────────────
|
||||
|
||||
|
||||
def extract_inbound_hints(
|
||||
messages: Iterable[TlogMessage],
|
||||
*,
|
||||
hint_prefix: str = "RELOC:",
|
||||
) -> list[InboundHint]:
|
||||
"""Extract operator-injected reloc-hint STATUSTEXTs from the tlog.
|
||||
|
||||
The test fixture builder injects ``STATUSTEXT`` messages whose
|
||||
payload ``text`` begins with ``hint_prefix`` (default ``"RELOC:"``)
|
||||
followed by a comma-separated payload (e.g. ``"RELOC:50.0,36.0,200"``
|
||||
encoding lat,lon,radius_m). The exact payload shape is not
|
||||
interpreted here — that belongs to the scenario test. We only
|
||||
identify which STATUSTEXTs are hints so the FDR correlator knows
|
||||
when the operator pressed "send".
|
||||
"""
|
||||
out: list[InboundHint] = []
|
||||
for msg in messages:
|
||||
if msg.msg_type != "STATUSTEXT":
|
||||
continue
|
||||
text = str(msg.fields.get("text", ""))
|
||||
if not text.startswith(hint_prefix):
|
||||
continue
|
||||
out.append(InboundHint(inject_timestamp_us=msg.timestamp_us, hint_text=text))
|
||||
return out
|
||||
|
||||
|
||||
def parse_reloc_payload(hint_text: str, *, hint_prefix: str = "RELOC:") -> tuple[float, float, float]:
|
||||
"""Parse ``RELOC:<lat>,<lon>,<radius_m>`` into ``(lat, lon, radius)``.
|
||||
|
||||
Raises ``ValueError`` on malformed payload — scenarios should let
|
||||
that surface so the run fails loudly rather than silently scoring
|
||||
AC-3 against garbage coordinates.
|
||||
"""
|
||||
if not hint_text.startswith(hint_prefix):
|
||||
raise ValueError(
|
||||
f"hint text does not start with {hint_prefix!r}: {hint_text!r}"
|
||||
)
|
||||
body = hint_text[len(hint_prefix):]
|
||||
parts = body.split(",")
|
||||
if len(parts) != 3:
|
||||
raise ValueError(
|
||||
f"hint payload must have 3 comma-separated fields "
|
||||
f"(lat,lon,radius_m); got {len(parts)}: {body!r}"
|
||||
)
|
||||
try:
|
||||
lat = float(parts[0])
|
||||
lon = float(parts[1])
|
||||
radius_m = float(parts[2])
|
||||
except ValueError as exc:
|
||||
raise ValueError(f"hint payload fields must be floats: {body!r}") from exc
|
||||
return (lat, lon, radius_m)
|
||||
|
||||
|
||||
def collect_messages_to_list(messages: Iterable[TlogMessage]) -> list[TlogMessage]:
|
||||
"""Materialise an iterator into a list — convenience for multi-pass eval.
|
||||
|
||||
Mirrors ``ap_contract_evaluator.collect_messages_to_list``: scenarios
|
||||
parse the tlog once via ``iter_messages`` and run multiple analyzers
|
||||
over the result.
|
||||
"""
|
||||
return list(messages)
|
||||
@@ -29,6 +29,8 @@ Fixture file naming (under `${E2E_SITL_REPLAY_DIR}/`):
|
||||
{messages: [{image_id?, lat_deg, lon_deg} | null, ...]}
|
||||
* `ap_parameters_<host>.json` — {<param_name>: <value>, ...}
|
||||
* `ap_tlog_<host>.tlog` — raw mavproxy tlog (any binary content)
|
||||
* `gcs_tlog_<host>.tlog` — raw mavproxy-listener tlog from the GCS link
|
||||
(SUT→GCS summary stream + GCS→SUT operator commands; FT-P-12, FT-P-13)
|
||||
* `inav_handshake_<host>.json` — {established_within_s: float | None}
|
||||
* `inav_msp_frames_<host>.json` — {frames: [...], expected_num_sat: int}
|
||||
* `inav_gps_state_<host>.json` — {fix_type, num_sat, provider}
|
||||
@@ -418,6 +420,35 @@ def capture_ap_tlog(host: str, duration_s: float) -> Path:
|
||||
return path
|
||||
|
||||
|
||||
def capture_gcs_tlog(host: str, duration_s: float) -> Path:
|
||||
"""Return the path to the GCS-side mavproxy-listener tlog for ``host``.
|
||||
|
||||
Fixture: ``${E2E_SITL_REPLAY_DIR}/gcs_tlog_<host>.tlog``. The tlog
|
||||
captures both directions over the QGC GCS link — SUT→GCS summary
|
||||
bursts (``GLOBAL_POSITION_INT`` + ``NAMED_VALUE_FLOAT``) and
|
||||
GCS→SUT operator commands (``STATUSTEXT`` reloc-hints,
|
||||
``COMMAND_LONG`` parameter reads, etc.).
|
||||
|
||||
``duration_s`` is recorded for future live-mode use but ignored here
|
||||
— under FDR-replay the fixture file IS the captured stream.
|
||||
|
||||
Raises ``RuntimeError`` if env var unset or fixture missing.
|
||||
"""
|
||||
if duration_s <= 0:
|
||||
raise RuntimeError(f"capture_gcs_tlog: duration_s must be positive; got {duration_s}")
|
||||
root = replay_dir()
|
||||
if root is None:
|
||||
raise RuntimeError(
|
||||
f"capture_gcs_tlog: {_ENV_VAR} env var not set"
|
||||
)
|
||||
path = root / f"gcs_tlog_{host}.tlog"
|
||||
if not path.exists():
|
||||
raise RuntimeError(
|
||||
f"capture_gcs_tlog: fixture not found at {path}"
|
||||
)
|
||||
return path
|
||||
|
||||
|
||||
# read_ap_parameter — reads from param-dump JSON
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,109 @@
|
||||
"""FT-P-12 — GCS downsample at 1-2 Hz (AZ-420 / AC-6.1).
|
||||
|
||||
The full scenario:
|
||||
|
||||
1. Start the SUT against the SITL container; ``mavproxy-listener``
|
||||
captures the SUT↔GCS link to ``${E2E_SITL_REPLAY_DIR}/gcs_tlog_<host>.tlog``.
|
||||
2. Replay ``flight_derkachi.mp4`` for 60 s through the SUT's file frame
|
||||
source so the C8 ``QgcTelemetryAdapter`` produces summary bursts.
|
||||
3. After replay, parse the captured tlog for SUT-emitted
|
||||
``GLOBAL_POSITION_INT`` (the position half of the QGC summary pair)
|
||||
over the 60 s window.
|
||||
4. AC-1: observed rate must land in [1, 2] Hz inclusive (AC-6.1).
|
||||
5. AC-5: parameterised per ``(fc_adapter, vio_strategy)``.
|
||||
|
||||
Gated on:
|
||||
|
||||
* ``runner.helpers.frame_source_replay`` — owned by AZ-441 (still a
|
||||
stub today; scenario skips via ``sitl_replay_ready``).
|
||||
* ``runner.helpers.sitl_observer.capture_gcs_tlog`` — owned by AZ-420
|
||||
(AP-side parity surface to ``capture_ap_tlog``; loads the
|
||||
``gcs_tlog_<host>.tlog`` FDR-replay fixture).
|
||||
* ``runner.helpers.gcs_telemetry_evaluator.compute_gcs_summary_rate`` —
|
||||
pure-logic evaluator covered by
|
||||
``e2e/_unit_tests/helpers/test_gcs_telemetry_evaluator.py``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from runner.helpers import gcs_telemetry_evaluator as gte
|
||||
from runner.helpers import mavproxy_tlog_reader as mtr
|
||||
|
||||
DERKACHI_DIR = (
|
||||
Path(__file__).resolve().parents[3]
|
||||
/ "_docs"
|
||||
/ "00_problem"
|
||||
/ "input_data"
|
||||
/ "flight_derkachi"
|
||||
)
|
||||
DERKACHI_MP4 = DERKACHI_DIR / "flight_derkachi.mp4"
|
||||
REPLAY_WINDOW_S = 60
|
||||
|
||||
|
||||
@pytest.mark.traces_to("AC-6.1,AC-1,AC-5")
|
||||
def test_ft_p_12_gcs_downsample(
|
||||
fc_adapter: str,
|
||||
vio_strategy: str,
|
||||
evidence_dir, # type: ignore[no-untyped-def]
|
||||
run_id: str,
|
||||
nfr_recorder, # type: ignore[no-untyped-def]
|
||||
sitl_replay_ready: bool,
|
||||
) -> None:
|
||||
"""Full FT-P-12 scenario (AC-6.1). See module docstring.
|
||||
|
||||
AC-1: GCS rate ∈ [1, 2] Hz over the 60 s window — covered by
|
||||
``compute_gcs_summary_rate``; unit-tested in
|
||||
``test_gcs_telemetry_evaluator.py``.
|
||||
AC-5: parameterised across ``(fc_adapter, vio_strategy)``.
|
||||
"""
|
||||
if not sitl_replay_ready:
|
||||
pytest.skip(
|
||||
"FT-P-12 full replay requires `E2E_SITL_REPLAY_DIR` to point at a "
|
||||
"prepared SITL replay fixture exposing `gcs_tlog_<host>.tlog` "
|
||||
"(AZ-595 + AZ-420 fixture builder). Pure-logic AC-6.1 coverage "
|
||||
"lives in e2e/_unit_tests/helpers/test_gcs_telemetry_evaluator.py."
|
||||
)
|
||||
|
||||
from runner.helpers import sitl_observer
|
||||
from runner.helpers.frame_source_replay import FrameSourceReplayer
|
||||
|
||||
# 1. Drive replay; the mavproxy-listener captures the GCS link in
|
||||
# parallel via the docker-compose fixture wiring (no in-process
|
||||
# work needed here — the listener writes to disk).
|
||||
sitl_host = "sitl-ardupilot" if fc_adapter == "ardupilot" else "sitl-inav"
|
||||
FrameSourceReplayer(_resolve_frame_sink()).replay_video(DERKACHI_MP4)
|
||||
tlog_path = sitl_observer.capture_gcs_tlog(host=sitl_host, duration_s=REPLAY_WINDOW_S)
|
||||
|
||||
# 2. Materialise the tlog once (iter_messages is single-pass).
|
||||
msgs = gte.collect_messages_to_list(mtr.iter_messages(tlog_path))
|
||||
if not msgs:
|
||||
pytest.fail(f"FT-P-12: empty GCS tlog at {tlog_path}")
|
||||
|
||||
# 3. AC-1: GCS summary rate.
|
||||
rate = gte.compute_gcs_summary_rate(msgs)
|
||||
|
||||
# 4. NFR metrics.
|
||||
nfr_recorder.record_metric(
|
||||
"ft_p_12.gcs_summary_rate_hz", rate.observed_rate_hz, ac_id="AC-6.1"
|
||||
)
|
||||
nfr_recorder.record_metric(
|
||||
"ft_p_12.gcs_summary_messages", float(rate.total_summary_messages), ac_id="AC-6.1"
|
||||
)
|
||||
|
||||
# 5. AC-1 assertion.
|
||||
assert rate.passes, (
|
||||
f"AC-6.1 (GCS rate ∈ [{rate.min_required_hz}, {rate.max_required_hz}] Hz) "
|
||||
f"failed: observed_rate_hz={rate.observed_rate_hz:.3f}, "
|
||||
f"messages={rate.total_summary_messages}, window_us={rate.window_us}"
|
||||
)
|
||||
|
||||
|
||||
def _resolve_frame_sink(): # type: ignore[no-untyped-def]
|
||||
"""Return a replay-mode `FrameSink` (counter-only; AZ-597)."""
|
||||
from runner.helpers.replay_mode import NullFrameSink
|
||||
|
||||
return NullFrameSink()
|
||||
@@ -0,0 +1,210 @@
|
||||
"""FT-P-13 — GCS command path: operator re-loc hint (AZ-420 / AC-6.2).
|
||||
|
||||
The full scenario:
|
||||
|
||||
1. Drive the SUT into ``dead_reckoned`` state (e.g. via a synthesised
|
||||
mid-blackout segment, FT-N-03 style). ``mavproxy-listener`` captures
|
||||
the SUT↔GCS link to ``gcs_tlog_<host>.tlog``.
|
||||
2. While the SUT is in ``dead_reckoned``, the fixture builder has
|
||||
injected one ``STATUSTEXT`` from mavproxy carrying the operator's
|
||||
re-loc hint (payload ``RELOC:<lat>,<lon>,<radius_m>``).
|
||||
3. The SUT's C8 ``QgcTelemetryAdapter`` translates the inbound command
|
||||
into an ``OperatorCommand`` DTO and emits an FDR ``log`` record with
|
||||
``payload.kind == "c8.gcs.operator_command"``.
|
||||
4. The next nav-camera frame after the hint causes C2 to publish a new
|
||||
per-frame ``anchor_search_region`` FDR record whose centre has
|
||||
shifted toward the hint relative to the last pre-hint region.
|
||||
5. No ``BAD_SIGNATURE`` / ``UNAUTHORIZED`` / ``REJECTED`` STATUSTEXT is
|
||||
emitted in the ack window — the hint is well-formed, not a security
|
||||
event.
|
||||
|
||||
ACs:
|
||||
|
||||
* AC-1: FT-P-12 GCS rate — covered by ``test_ft_p_12_gcs_downsample``;
|
||||
this file does NOT re-assert it (single source of truth).
|
||||
* AC-2: hint ack via FDR within ≤2 s — ``correlate_hint_acks``.
|
||||
* AC-3: search prior bias toward hint — ``evaluate_search_region_shift``
|
||||
against ``anchor_search_region`` FDR records.
|
||||
* AC-4: no security/auth rejection — ``detect_hint_rejection``.
|
||||
* AC-5: parameterised per ``(fc_adapter, vio_strategy)``.
|
||||
|
||||
Gated on:
|
||||
|
||||
* ``runner.helpers.frame_source_replay`` — owned by AZ-441 (still a
|
||||
stub today; scenario skips via ``sitl_replay_ready``).
|
||||
* ``runner.helpers.sitl_observer.capture_gcs_tlog`` — owned by AZ-420.
|
||||
* ``runner.helpers.fdr_reader`` — owned by AZ-594.
|
||||
* ``runner.helpers.gcs_telemetry_evaluator`` — unit-tested in
|
||||
``e2e/_unit_tests/helpers/test_gcs_telemetry_evaluator.py``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from runner.helpers import gcs_telemetry_evaluator as gte
|
||||
from runner.helpers import mavproxy_tlog_reader as mtr
|
||||
|
||||
DERKACHI_DIR = (
|
||||
Path(__file__).resolve().parents[3]
|
||||
/ "_docs"
|
||||
/ "00_problem"
|
||||
/ "input_data"
|
||||
/ "flight_derkachi"
|
||||
)
|
||||
DERKACHI_MP4 = DERKACHI_DIR / "flight_derkachi.mp4"
|
||||
REPLAY_WINDOW_S = 60
|
||||
|
||||
|
||||
@pytest.mark.traces_to("AC-6.2,AC-2,AC-3,AC-4,AC-5")
|
||||
def test_ft_p_13_gcs_command(
|
||||
fc_adapter: str,
|
||||
vio_strategy: str,
|
||||
evidence_dir, # type: ignore[no-untyped-def]
|
||||
run_id: str,
|
||||
nfr_recorder, # type: ignore[no-untyped-def]
|
||||
sitl_replay_ready: bool,
|
||||
) -> None:
|
||||
"""Full FT-P-13 scenario (AC-6.2). See module docstring.
|
||||
|
||||
AC-2: hint ack ≤2 s via FDR ``c8.gcs.operator_command`` record —
|
||||
covered by ``correlate_hint_acks`` + ``HintAckReport.passes``.
|
||||
AC-3: anchor search region biases toward hint — covered by
|
||||
``evaluate_search_region_shift``.
|
||||
AC-4: no rejection STATUSTEXT in the ack window — covered by
|
||||
``detect_hint_rejection``.
|
||||
AC-5: parameterised across ``(fc_adapter, vio_strategy)``.
|
||||
"""
|
||||
if not sitl_replay_ready:
|
||||
pytest.skip(
|
||||
"FT-P-13 full replay requires `E2E_SITL_REPLAY_DIR` to point at a "
|
||||
"prepared SITL replay fixture exposing `gcs_tlog_<host>.tlog` "
|
||||
"with an injected `RELOC:` STATUSTEXT plus the matching FDR "
|
||||
"`c8.gcs.operator_command` ack record and `anchor_search_region` "
|
||||
"per-frame records (AZ-595 + AZ-420 fixture builder). Pure-logic "
|
||||
"AC-6.2 coverage lives in "
|
||||
"e2e/_unit_tests/helpers/test_gcs_telemetry_evaluator.py."
|
||||
)
|
||||
|
||||
from runner.helpers import fdr_reader, sitl_observer
|
||||
from runner.helpers.frame_source_replay import FrameSourceReplayer
|
||||
|
||||
sitl_host = "sitl-ardupilot" if fc_adapter == "ardupilot" else "sitl-inav"
|
||||
|
||||
# 1. Drive replay; the mavproxy-listener and FDR sink capture in parallel.
|
||||
FrameSourceReplayer(_resolve_frame_sink()).replay_video(DERKACHI_MP4)
|
||||
tlog_path = sitl_observer.capture_gcs_tlog(host=sitl_host, duration_s=REPLAY_WINDOW_S)
|
||||
|
||||
# 2. Materialise the tlog ONCE (iter_messages is single-pass) and
|
||||
# extract the operator-injected RELOC: hints.
|
||||
msgs = gte.collect_messages_to_list(mtr.iter_messages(tlog_path))
|
||||
if not msgs:
|
||||
pytest.fail(f"FT-P-13: empty GCS tlog at {tlog_path}")
|
||||
hints = gte.extract_inbound_hints(msgs)
|
||||
if not hints:
|
||||
pytest.fail(
|
||||
f"FT-P-13: GCS tlog at {tlog_path} contains no `RELOC:` STATUSTEXT — "
|
||||
"the fixture builder must inject at least one operator re-loc hint."
|
||||
)
|
||||
|
||||
# 3. Walk the FDR archive for c8.gcs.operator_command acks +
|
||||
# anchor_search_region per-frame records.
|
||||
fdr_root = Path(evidence_dir).parent / f"run-{run_id}" / "fdr"
|
||||
acks: list[gte.FdrCommandAck] = []
|
||||
regions: list[gte.SearchRegionRecord] = []
|
||||
for rec in fdr_reader.iter_records(fdr_root):
|
||||
if (
|
||||
rec.record_type == "log"
|
||||
and rec.payload.get("kind") == gte.HINT_FDR_KIND
|
||||
and isinstance(rec.payload.get("kv"), dict)
|
||||
):
|
||||
acks.append(
|
||||
gte.FdrCommandAck(
|
||||
ack_timestamp_us=int(rec.monotonic_ms) * 1000,
|
||||
payload_kv=dict(rec.payload["kv"]), # type: ignore[arg-type]
|
||||
)
|
||||
)
|
||||
elif rec.record_type == gte.ANCHOR_SEARCH_REGION_FDR_KIND:
|
||||
regions.append(
|
||||
gte.SearchRegionRecord(
|
||||
monotonic_us=int(rec.monotonic_ms) * 1000,
|
||||
centre_lat_deg=float(rec.payload["centre_lat_deg"]), # type: ignore[arg-type]
|
||||
centre_lon_deg=float(rec.payload["centre_lon_deg"]), # type: ignore[arg-type]
|
||||
radius_m=float(rec.payload["radius_m"]), # type: ignore[arg-type]
|
||||
)
|
||||
)
|
||||
|
||||
# 4. AC-2: ack latencies.
|
||||
ack_report = gte.correlate_hint_acks(hints, acks)
|
||||
|
||||
# 5. AC-3: search-region shift (evaluated against the FIRST hint only;
|
||||
# multi-hint scenarios are out of scope for AC-6.2 single-pass).
|
||||
first_hint = hints[0]
|
||||
hint_lat, hint_lon, _radius_m = gte.parse_reloc_payload(first_hint.hint_text)
|
||||
shift_report = gte.evaluate_search_region_shift(
|
||||
regions,
|
||||
hint_inject_timestamp_us=first_hint.inject_timestamp_us,
|
||||
hint_lat_deg=hint_lat,
|
||||
hint_lon_deg=hint_lon,
|
||||
)
|
||||
|
||||
# 6. AC-4: no rejection in the ack window.
|
||||
rejection_report = gte.detect_hint_rejection(msgs, first_hint.inject_timestamp_us)
|
||||
|
||||
# 7. NFR metrics.
|
||||
first_latency = ack_report.latencies_ms[0] if ack_report.latencies_ms else None
|
||||
if first_latency is not None:
|
||||
nfr_recorder.record_metric(
|
||||
"ft_p_13.hint_ack_latency_ms", first_latency, ac_id="AC-2"
|
||||
)
|
||||
nfr_recorder.record_metric(
|
||||
"ft_p_13.hint_count", float(len(hints)), ac_id="AC-2"
|
||||
)
|
||||
nfr_recorder.record_metric(
|
||||
"ft_p_13.acked_count", float(ack_report.acked_count), ac_id="AC-2"
|
||||
)
|
||||
if shift_report.distance_after_m is not None:
|
||||
nfr_recorder.record_metric(
|
||||
"ft_p_13.search_region_distance_after_m",
|
||||
shift_report.distance_after_m,
|
||||
ac_id="AC-3",
|
||||
)
|
||||
if shift_report.distance_before_m is not None:
|
||||
nfr_recorder.record_metric(
|
||||
"ft_p_13.search_region_distance_before_m",
|
||||
shift_report.distance_before_m,
|
||||
ac_id="AC-3",
|
||||
)
|
||||
nfr_recorder.record_metric(
|
||||
"ft_p_13.rejection_count", float(rejection_report.rejection_count), ac_id="AC-4"
|
||||
)
|
||||
|
||||
# 8. AC assertions.
|
||||
assert ack_report.passes, (
|
||||
f"AC-2 (hint ack ≤{ack_report.max_required_ms} ms via FDR "
|
||||
f"`{gte.HINT_FDR_KIND}` record) failed: "
|
||||
f"hints={len(ack_report.hints)}, acked={ack_report.acked_count}, "
|
||||
f"latencies_ms={ack_report.latencies_ms}"
|
||||
)
|
||||
assert shift_report.passes, (
|
||||
"AC-3 (anchor_search_region centre shifts toward hint) failed: "
|
||||
f"region_before={shift_report.region_before}, "
|
||||
f"region_after={shift_report.region_after}, "
|
||||
f"distance_before_m={shift_report.distance_before_m}, "
|
||||
f"distance_after_m={shift_report.distance_after_m}"
|
||||
)
|
||||
assert rejection_report.passes, (
|
||||
f"AC-4 (no rejection STATUSTEXT in {rejection_report.window_us / 1e6:.1f} s "
|
||||
"post-inject window) failed: "
|
||||
f"rejection_count={rejection_report.rejection_count}, "
|
||||
f"texts={rejection_report.rejection_texts}"
|
||||
)
|
||||
|
||||
|
||||
def _resolve_frame_sink(): # type: ignore[no-untyped-def]
|
||||
"""Return a replay-mode `FrameSink` (counter-only; AZ-597)."""
|
||||
from runner.helpers.replay_mode import NullFrameSink
|
||||
|
||||
return NullFrameSink()
|
||||
Reference in New Issue
Block a user