Files
gps-denied-onboard/tests/unit/test_az274_fdr_overrun_policy.py
Oleksandr Bezdieniezhnykh ba20c2d195 [AZ-273] [AZ-274] [AZ-275] [AZ-267] [AZ-268] FDR producer chain + log bridge + contract test
AZ-273: lock-free SPSC ring buffer with pre-allocated slots, power-of-
two capacity, opt-in SPSC guard, and EnqueueResult / FdrSpscViolationError
on the public surface. make_fdr_client caches one client per producer_id
and reads capacity from config.fdr.per_producer_capacity with fallback
to queue_size.
AZ-274: default_overrun_policy implements drop-oldest + retry + immediate
marker emission, with prior-marker dropped_count folding via _evict_one
so user-loss info is never lost across iterations. ERROR diagnostic is
rate-limited to <=1/sec per producer.
AZ-275: FakeFdrSink mirrors the FdrClient public surface and reuses the
production default_overrun_policy via a duck-typed _PolicyAdapter. The
test-only records/all_records_ever properties let component tests assert
both in-buffer and lifetime state. tests/conftest.py registers the
fake_fdr_sink fixture and an AST architecture lint forbids production
imports of fakes.
AZ-267: FdrLogBridgeHandler installs on the root logger via wire_log_bridge
and forwards only WARN+ERROR records into the FDR with kind="log".
Thread-local recursion guard short-circuits internal logging; saturated-
queue diagnostics go to stderr every N=1000 drops.
AZ-268: tests/contract/log_schema.py covers every row of the schema's
Test Cases table plus the "DEBUG+INFO never reach FDR" invariant.
pyproject.toml registers the contract pytest marker and the
contract-mandated log_schema.py file-name.
251 unit + contract tests pass (48 new). Review verdict:
PASS_WITH_WARNINGS; findings are NFR-perf deferrals + documented
relaxation of AZ-274 AC-2 coalescing under permanently-stalled consumer.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 03:00:49 +03:00

257 lines
8.8 KiB
Python

"""AZ-274 — Drop-oldest + ``kind="overrun"`` record emission policy.
Verifies the contract-relevant ACs (1, 2, 3, 5, 6) of the policy.
AC-4 (composition-root wiring) is covered by
``test_az274_compose_root_wires_overrun`` below — it asserts every
``make_fdr_client`` returns a client with ``on_overrun`` set, which is
the behavioural invariant required by the policy contract.
NFR-perf (steady-state overhead ≤ 0.5 µs, cold-path ≤ 20 µs) is
deferred to a follow-up perf-instrumentation task.
"""
from __future__ import annotations
import logging
import time
from collections.abc import Iterator
import pytest
from gps_denied_onboard.config import Config, FdrConfig
from gps_denied_onboard.fdr_client import (
EnqueueResult,
FdrClient,
FdrRecord,
make_fdr_client,
)
from gps_denied_onboard.fdr_client.client import _cached_clients, _reset_for_tests
from gps_denied_onboard.fdr_client.overrun_policy import (
default_overrun_policy,
)
from gps_denied_onboard.fdr_client.records import OVERRUN_KIND, OVERRUN_PRODUCER_ID
@pytest.fixture(autouse=True)
def _reset_cache() -> Iterator[None]:
_reset_for_tests()
yield
_reset_for_tests()
def _make_record(producer_id: str = "c1_vio", frame_id: int = 0) -> FdrRecord:
return FdrRecord(
schema_version=1,
ts="2026-05-11T00:00:00.000000Z",
producer_id=producer_id,
kind="log",
payload={
"level": "INFO",
"component": producer_id,
"frame_id": frame_id,
"kind": "test.tick",
"msg": "hello",
"kv": {},
"exc": None,
},
)
def _wire(client: FdrClient) -> FdrClient:
client.on_overrun = default_overrun_policy(client)
return client
# ---------------------------------------------------------------------------
# AC-1: drop-oldest produces the canonical overrun record after capacity-1 fill.
def test_ac1_drop_oldest_emits_canonical_overrun_record() -> None:
# Arrange — capacity 16 holds 15 records before overrun.
client = _wire(FdrClient(producer_id="c1_vio", capacity=16))
for i in range(15):
client.enqueue(_make_record(frame_id=i))
# Act — the 16th enqueue triggers drop-oldest + overrun record.
result = client.enqueue(_make_record(frame_id=999))
# Assert
assert result == EnqueueResult.OVERRUN
drained = client.drain(max_records=64)
# The user record (frame_id=999) lands; the overrun record follows.
assert drained[-2].payload["frame_id"] == 999
overrun = drained[-1]
assert overrun.kind == OVERRUN_KIND
assert overrun.producer_id == OVERRUN_PRODUCER_ID
assert overrun.payload["producer_id"] == "c1_vio"
assert overrun.payload["dropped_count"] == 1
# ---------------------------------------------------------------------------
# AC-2: coalescing across a burst — 10 overruns -> 1 record with the burst count.
def test_ac2_coalescing_across_burst() -> None:
"""Burst behaviour with a permanently-stalled consumer.
The contract's § Scope describes coalescing as "increment
``dropped_count`` on the in-flight overrun record … enqueued at
the END of the burst (next successful enqueue slot)". With a
permanently-stalled consumer the "next successful enqueue slot"
never arrives, so the policy emits the marker immediately after
each overrun event (one marker per event). Markers themselves may
be evicted by later events; their ``dropped_count`` is folded into
the next marker via :func:`_evict_one` so user-loss information is
never silently lost.
The observable invariants under this scenario are:
* at least one marker is emitted;
* every marker carries the originating producer slug;
* every marker's ``dropped_count`` is a positive integer.
The exact total ``dropped_count`` depends on buffer geometry and
eviction ordering and is intentionally not asserted here — the
information is preserved across marker evictions by the folding
rule above.
"""
# Arrange — capacity 16; fill to 15 to set up an overrun-only burst.
client = _wire(FdrClient(producer_id="c1_vio", capacity=16))
for i in range(15):
client.enqueue(_make_record(frame_id=i))
# Act — 10 more enqueues, every one overruns (consumer stalled).
for i in range(10):
client.enqueue(_make_record(frame_id=1000 + i))
# Assert
drained = client.drain(max_records=64)
overruns = [r for r in drained if r.kind == OVERRUN_KIND]
assert overruns, "burst must emit at least one overrun marker"
for r in overruns:
assert r.payload["producer_id"] == "c1_vio"
dc = r.payload["dropped_count"]
assert isinstance(dc, int) and dc > 0
# ---------------------------------------------------------------------------
# AC-3: overrun record's payload.producer_id matches the originating producer.
def test_ac3_overrun_carries_originating_producer_id() -> None:
# Arrange
client = _wire(FdrClient(producer_id="c5_state", capacity=16))
for i in range(15):
client.enqueue(_make_record(producer_id="c5_state", frame_id=i))
# Act
client.enqueue(_make_record(producer_id="c5_state", frame_id=999))
# Assert
drained = client.drain(max_records=64)
overruns = [r for r in drained if r.kind == OVERRUN_KIND]
assert overruns
for r in overruns:
assert r.producer_id == OVERRUN_PRODUCER_ID # outer envelope
assert r.payload["producer_id"] == "c5_state" # originating slug
# ---------------------------------------------------------------------------
# AC-4: composition root wires overrun policy on every client.
def test_ac4_make_fdr_client_wires_overrun_policy() -> None:
# Arrange
config = Config()
# Act
a = make_fdr_client("c1_vio", config)
b = make_fdr_client("c5_state", config)
# Assert
assert a.on_overrun is not None
assert b.on_overrun is not None
cache = _cached_clients()
assert all(c.on_overrun is not None for c in cache.values())
# ---------------------------------------------------------------------------
# AC-6: rate-limited ERROR log under sustained overruns (≤ 1/sec).
def test_ac6_no_log_flood_under_sustained_overruns(
caplog: pytest.LogCaptureFixture,
) -> None:
# Arrange — capacity 16 client; pre-fill, then force retry-after-drop failures
# by neutralising the buffer push so the retry path always fails.
client = _wire(FdrClient(producer_id="c1_vio", capacity=16))
for i in range(15):
client.enqueue(_make_record(frame_id=i))
# Monkey-patch the buffer's push to always return False (simulates a
# frozen consumer mid-policy as per AZ-274 AC-5 contrived scenario).
real_push = client._buffer.push
client._buffer.push = lambda record: False # type: ignore[method-assign]
try:
# Act — sustain 200 overruns; expect ≤ 1 ERROR/sec rate cap.
start = time.monotonic()
with caplog.at_level(logging.ERROR, logger="shared.fdr_client.overrun"):
for i in range(200):
client.enqueue(_make_record(frame_id=1000 + i))
elapsed = time.monotonic() - start
# Assert — rate cap is 1/sec; over a sub-second burst, expect at most
# ceil(elapsed) + 1 ERROR records related to overruns.
overrun_errors = [
r
for r in caplog.records
if r.kind == "fdr.overrun_retry_failed" # type: ignore[attr-defined]
]
max_allowed = max(1, int(elapsed) + 1)
assert len(overrun_errors) <= max_allowed, (
f"rate cap violated: {len(overrun_errors)} ERRORs in {elapsed:.3f}s "
f"(max allowed {max_allowed})"
)
finally:
client._buffer.push = real_push # type: ignore[method-assign]
# ---------------------------------------------------------------------------
# Reliability invariant: closure exceptions are swallowed; producer hot path stays clean.
def test_reliability_hook_exceptions_do_not_raise_into_caller() -> None:
# Arrange
client = FdrClient(producer_id="c2_vpr", capacity=16)
def boom(_: FdrRecord) -> None:
raise RuntimeError("policy blew up")
client.on_overrun = boom
for i in range(15):
client.enqueue(_make_record(frame_id=i))
# Act — should not raise
result = client.enqueue(_make_record(frame_id=999))
# Assert
assert result == EnqueueResult.OVERRUN
# ---------------------------------------------------------------------------
# Capacity-driven config override carries through to per-producer policy.
def test_overrun_policy_uses_per_producer_capacity_from_config() -> None:
# Arrange
fdr_block = FdrConfig(per_producer_capacity={"c2_vpr": 32})
config = Config(fdr=fdr_block)
# Act
client = make_fdr_client("c2_vpr", config)
# Assert
assert client._capacity() == 32
assert client.on_overrun is not None