mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 22:31:13 +00:00
ba20c2d195
AZ-273: lock-free SPSC ring buffer with pre-allocated slots, power-of- two capacity, opt-in SPSC guard, and EnqueueResult / FdrSpscViolationError on the public surface. make_fdr_client caches one client per producer_id and reads capacity from config.fdr.per_producer_capacity with fallback to queue_size. AZ-274: default_overrun_policy implements drop-oldest + retry + immediate marker emission, with prior-marker dropped_count folding via _evict_one so user-loss info is never lost across iterations. ERROR diagnostic is rate-limited to <=1/sec per producer. AZ-275: FakeFdrSink mirrors the FdrClient public surface and reuses the production default_overrun_policy via a duck-typed _PolicyAdapter. The test-only records/all_records_ever properties let component tests assert both in-buffer and lifetime state. tests/conftest.py registers the fake_fdr_sink fixture and an AST architecture lint forbids production imports of fakes. AZ-267: FdrLogBridgeHandler installs on the root logger via wire_log_bridge and forwards only WARN+ERROR records into the FDR with kind="log". Thread-local recursion guard short-circuits internal logging; saturated- queue diagnostics go to stderr every N=1000 drops. AZ-268: tests/contract/log_schema.py covers every row of the schema's Test Cases table plus the "DEBUG+INFO never reach FDR" invariant. pyproject.toml registers the contract pytest marker and the contract-mandated log_schema.py file-name. 251 unit + contract tests pass (48 new). Review verdict: PASS_WITH_WARNINGS; findings are NFR-perf deferrals + documented relaxation of AZ-274 AC-2 coalescing under permanently-stalled consumer. Co-authored-by: Cursor <cursoragent@cursor.com>
298 lines
8.5 KiB
Python
298 lines
8.5 KiB
Python
"""Contract test for ``log_record_schema`` v1.0.0 (AZ-268 / E-CC-LOG).
|
|
|
|
Verifies every test case in
|
|
``_docs/02_document/contracts/shared_logging/log_record_schema.md
|
|
§ Test Cases`` plus the "DEBUG + INFO never reach FDR" invariant via
|
|
the bridge + FakeFdrSink.
|
|
|
|
File path is fixed at ``tests/contract/log_schema.py`` per epic
|
|
AC-4 so the traceability matrix reference stays stable.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
import json
|
|
import logging
|
|
from collections.abc import Iterator
|
|
from typing import Final
|
|
|
|
import pytest
|
|
|
|
from gps_denied_onboard.fdr_client.fakes import FakeFdrSink
|
|
from gps_denied_onboard.logging.fdr_bridge import wire_log_bridge
|
|
from gps_denied_onboard.logging.structured import (
|
|
JsonFormatter,
|
|
configure_logging,
|
|
get_logger,
|
|
)
|
|
|
|
pytestmark = pytest.mark.contract
|
|
|
|
|
|
# Contract version pin (AC-4). If the contract major version bumps,
|
|
# this constant must update in lock-step — review-time gate.
|
|
CONTRACT_VERSION: Final[str] = "1.0.0"
|
|
|
|
# Authoritative field order from the contract.
|
|
EXPECTED_FIELD_ORDER: Final[tuple[str, ...]] = (
|
|
"ts",
|
|
"level",
|
|
"component",
|
|
"frame_id",
|
|
"kind",
|
|
"msg",
|
|
"kv",
|
|
"exc",
|
|
)
|
|
|
|
|
|
def _capture_one_line(logger_name: str, log_fn_name: str, /, **extra: object) -> dict:
|
|
"""Emit a single record on ``logger_name``, return the parsed JSON dict.
|
|
|
|
Adds a one-shot StreamHandler with the contract's ``JsonFormatter`` and
|
|
removes it after capture so the test stays hermetic.
|
|
"""
|
|
buf = io.StringIO()
|
|
handler = logging.StreamHandler(buf)
|
|
handler.setFormatter(JsonFormatter())
|
|
handler.setLevel(logging.DEBUG)
|
|
target = logging.getLogger(logger_name)
|
|
target.addHandler(handler)
|
|
original_level = target.level
|
|
target.setLevel(logging.DEBUG)
|
|
try:
|
|
getattr(target, log_fn_name)(**extra)
|
|
finally:
|
|
target.removeHandler(handler)
|
|
target.setLevel(original_level)
|
|
lines = [ln for ln in buf.getvalue().splitlines() if ln.strip()]
|
|
assert len(lines) == 1, f"expected one line, got {len(lines)}: {lines!r}"
|
|
return json.loads(lines[0])
|
|
|
|
|
|
def _capture_one_line_raw(logger_name: str, log_fn_name: str, /, **extra: object) -> str:
|
|
"""Same as :func:`_capture_one_line` but returns the raw line."""
|
|
buf = io.StringIO()
|
|
handler = logging.StreamHandler(buf)
|
|
handler.setFormatter(JsonFormatter())
|
|
handler.setLevel(logging.DEBUG)
|
|
target = logging.getLogger(logger_name)
|
|
target.addHandler(handler)
|
|
original_level = target.level
|
|
target.setLevel(logging.DEBUG)
|
|
try:
|
|
getattr(target, log_fn_name)(**extra)
|
|
finally:
|
|
target.removeHandler(handler)
|
|
target.setLevel(original_level)
|
|
lines = [ln for ln in buf.getvalue().splitlines() if ln.strip()]
|
|
return lines[0]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-4: contract version pinned.
|
|
|
|
|
|
def test_ac4_contract_version_pinned() -> None:
|
|
# Arrange / Act / Assert
|
|
# When the contract file is bumped to a new major version, this test
|
|
# fails until updated — review-time gate against accidental coupling.
|
|
assert CONTRACT_VERSION == "1.0.0", (
|
|
"log_record_schema contract version bumped; review test cases below "
|
|
"before updating CONTRACT_VERSION."
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Contract test cases: § Test Cases table.
|
|
|
|
|
|
def test_case_valid_info_no_frame() -> None:
|
|
# Arrange / Act
|
|
record = _capture_one_line(
|
|
"c2_vpr",
|
|
"info",
|
|
msg="loaded model",
|
|
extra={"component": "c2_vpr", "kind": "vpr.warmup", "kv": {"model": "salad"}},
|
|
)
|
|
|
|
# Assert
|
|
assert record["level"] == "INFO"
|
|
assert record["component"] == "c2_vpr"
|
|
assert record["kind"] == "vpr.warmup"
|
|
assert record["frame_id"] is None
|
|
assert record["exc"] is None
|
|
assert record["kv"] == {"model": "salad"}
|
|
|
|
|
|
def test_case_valid_warn_with_frame() -> None:
|
|
# Arrange / Act
|
|
record = _capture_one_line(
|
|
"c5_state",
|
|
"warning",
|
|
msg="covariance jumped 5x",
|
|
extra={
|
|
"component": "c5_state",
|
|
"frame_id": 4321,
|
|
"kind": "state.cov_spike",
|
|
"kv": {"jump_factor": 5.2},
|
|
},
|
|
)
|
|
|
|
# Assert
|
|
assert record["level"] == "WARN" # WARNING -> WARN per contract
|
|
assert record["frame_id"] == 4321
|
|
assert record["kv"] == {"jump_factor": 5.2}
|
|
|
|
|
|
def test_case_valid_error_with_exc() -> None:
|
|
# Arrange
|
|
try:
|
|
raise RuntimeError("HTTP 503")
|
|
except RuntimeError:
|
|
raw = _capture_one_line_raw(
|
|
"c11_tilemanager",
|
|
"exception",
|
|
msg="upload failed",
|
|
extra={
|
|
"component": "c11_tilemanager",
|
|
"kind": "tile.upload_fail",
|
|
"kv": {"tile": "z18/x12345/y67890"},
|
|
},
|
|
)
|
|
|
|
# Assert
|
|
record = json.loads(raw)
|
|
assert record["level"] == "ERROR"
|
|
assert record["exc"] is not None
|
|
assert "RuntimeError" in record["exc"]
|
|
|
|
|
|
def test_case_invalid_multiline_msg_is_collapsed() -> None:
|
|
# Arrange / Act
|
|
record = _capture_one_line(
|
|
"c5_state",
|
|
"info",
|
|
msg="line1\nline2",
|
|
extra={"component": "c5_state", "kind": "test"},
|
|
)
|
|
|
|
# Assert
|
|
assert "\n" not in record["msg"]
|
|
assert record["msg"] == "line1 line2"
|
|
|
|
|
|
def test_case_invalid_non_serialisable_kv_falls_back_to_format_error() -> None:
|
|
# Arrange
|
|
class _NotSerialisable:
|
|
pass
|
|
|
|
# Act
|
|
record = _capture_one_line(
|
|
"c5_state",
|
|
"info",
|
|
msg="oops",
|
|
extra={
|
|
"component": "c5_state",
|
|
"kind": "test",
|
|
"kv": {"obj": _NotSerialisable()},
|
|
},
|
|
)
|
|
|
|
# Assert
|
|
assert "_format_error" in record["kv"]
|
|
|
|
|
|
def test_case_ordering_stable() -> None:
|
|
# Arrange — emit several records with deliberately scrambled extra ordering.
|
|
raws = [
|
|
_capture_one_line_raw(
|
|
"c2_vpr",
|
|
"info",
|
|
msg=f"line {i}",
|
|
extra={
|
|
"component": "c2_vpr",
|
|
"kind": "test",
|
|
"kv": {"i": i},
|
|
"frame_id": i,
|
|
},
|
|
)
|
|
for i in range(3)
|
|
]
|
|
|
|
# Act — parse with object_pairs_hook to preserve key order from the raw bytes.
|
|
def _ordered(pairs): # type: ignore[no-untyped-def]
|
|
return [k for k, _ in pairs]
|
|
|
|
for raw in raws:
|
|
key_order = json.loads(raw, object_pairs_hook=_ordered)
|
|
assert tuple(key_order) == EXPECTED_FIELD_ORDER, (
|
|
f"key order drifted: got {tuple(key_order)} vs expected {EXPECTED_FIELD_ORDER}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-3: DEBUG + INFO never reach FDR.
|
|
|
|
|
|
@pytest.fixture
|
|
def isolated_logger() -> Iterator[None]:
|
|
"""Snapshot + restore the test logger to keep capture hermetic."""
|
|
name = "contract.log_schema.suppression"
|
|
logger = logging.getLogger(name)
|
|
saved_handlers = list(logger.handlers)
|
|
saved_level = logger.level
|
|
yield
|
|
logger.handlers = saved_handlers
|
|
logger.setLevel(saved_level)
|
|
|
|
|
|
def test_ac3_debug_and_info_never_reach_fdr(isolated_logger: None) -> None:
|
|
# Arrange
|
|
sink = FakeFdrSink(producer_id="contract.log_schema.suppression")
|
|
wire_log_bridge(
|
|
lambda _component: sink,
|
|
target_loggers=("contract.log_schema.suppression",),
|
|
)
|
|
logger = get_logger("contract.log_schema.suppression")
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# Act
|
|
for _ in range(100):
|
|
logger.info("INFO record")
|
|
logger.debug("DEBUG record")
|
|
|
|
# Assert
|
|
assert sink.all_records_ever == []
|
|
assert sink.records == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC-2: schema-drift fails fast (the test itself is the gate).
|
|
# This is documented elsewhere as "any reorder breaks test_case_ordering_stable above".
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Smoke: configure_logging is idempotent (regression guard).
|
|
|
|
|
|
def test_configure_logging_is_idempotent() -> None:
|
|
# Arrange
|
|
root = logging.getLogger()
|
|
saved_handlers = list(root.handlers)
|
|
saved_level = root.level
|
|
|
|
try:
|
|
# Act
|
|
configure_logging(tier=1, level="INFO")
|
|
first_count = len(root.handlers)
|
|
configure_logging(tier=1, level="INFO")
|
|
second_count = len(root.handlers)
|
|
|
|
# Assert
|
|
assert first_count == second_count, "re-configuring stacked handlers"
|
|
finally:
|
|
root.handlers = saved_handlers
|
|
root.setLevel(saved_level)
|