[AZ-273] [AZ-274] [AZ-275] [AZ-267] [AZ-268] FDR producer chain + log bridge + contract test

AZ-273: lock-free SPSC ring buffer with pre-allocated slots, power-of-
two capacity, opt-in SPSC guard, and EnqueueResult / FdrSpscViolationError
on the public surface. make_fdr_client caches one client per producer_id
and reads capacity from config.fdr.per_producer_capacity with fallback
to queue_size.
AZ-274: default_overrun_policy implements drop-oldest + retry + immediate
marker emission, with prior-marker dropped_count folding via _evict_one
so user-loss info is never lost across iterations. ERROR diagnostic is
rate-limited to <=1/sec per producer.
AZ-275: FakeFdrSink mirrors the FdrClient public surface and reuses the
production default_overrun_policy via a duck-typed _PolicyAdapter. The
test-only records/all_records_ever properties let component tests assert
both in-buffer and lifetime state. tests/conftest.py registers the
fake_fdr_sink fixture and an AST architecture lint forbids production
imports of fakes.
AZ-267: FdrLogBridgeHandler installs on the root logger via wire_log_bridge
and forwards only WARN+ERROR records into the FDR with kind="log".
Thread-local recursion guard short-circuits internal logging; saturated-
queue diagnostics go to stderr every N=1000 drops.
AZ-268: tests/contract/log_schema.py covers every row of the schema's
Test Cases table plus the "DEBUG+INFO never reach FDR" invariant.
pyproject.toml registers the contract pytest marker and the
contract-mandated log_schema.py file-name.
251 unit + contract tests pass (48 new). Review verdict:
PASS_WITH_WARNINGS; findings are NFR-perf deferrals + documented
relaxation of AZ-274 AC-2 coalescing under permanently-stalled consumer.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-11 03:00:49 +03:00
parent 3acc7f33dd
commit ba20c2d195
24 changed files with 2714 additions and 20 deletions
+5 -2
View File
@@ -50,13 +50,16 @@ class LogConfig:
class FdrConfig:
"""Cross-cutting Flight Data Recorder block (E-CC-FDR-CLIENT / AZ-247).
The producer-side ring-buffer fields below are documented defaults
consumed by AZ-273; only the outer container is owned by AZ-269.
``queue_size`` is the documented default capacity for every producer.
``per_producer_capacity`` carries per-producer overrides keyed by
producer slug (consumed by AZ-273 ``make_fdr_client``); blocks
that omit a producer fall back to ``queue_size``.
"""
queue_size: int = 4096
overrun_policy: str = "drop_oldest"
path: str = "/var/lib/gps-denied/fdr"
per_producer_capacity: Mapping[str, int] = field(default_factory=dict)
@dataclass(frozen=True)
@@ -4,7 +4,12 @@ Producer-side API used by every component. Consumer-side writer lives in
`gps_denied_onboard.components.c13_fdr` (AZ-248 / E-C13).
"""
from gps_denied_onboard.fdr_client.client import FdrClient
from gps_denied_onboard.fdr_client.client import (
EnqueueResult,
FdrClient,
FdrSpscViolationError,
make_fdr_client,
)
from gps_denied_onboard.fdr_client.records import (
CURRENT_SCHEMA_VERSION,
KNOWN_KINDS,
@@ -23,9 +28,12 @@ __all__ = [
"MAX_INLINE_BLOB_BYTES",
"OVERRUN_KIND",
"OVERRUN_PRODUCER_ID",
"EnqueueResult",
"FdrClient",
"FdrRecord",
"FdrSchemaError",
"FdrSpscViolationError",
"make_fdr_client",
"parse",
"serialise",
]
+217 -6
View File
@@ -1,16 +1,227 @@
"""FDR producer-side client API — STUB.
"""``FdrClient`` — producer-side FDR queue (AZ-273 / E-CC-FDR-CLIENT).
Concrete client is owned by AZ-273. Bootstrap exposes the class so every component
can type `fdr: FdrClient` on its constructor.
The single handle every onboard producer holds. Calls :meth:`enqueue`
on its component-local frame; never blocks. The consumer side is the
C13 writer thread (AZ-248) which drains via :meth:`pop_one` /
:meth:`drain`.
Public surface frozen by
``_docs/02_document/contracts/shared_fdr_client/fdr_client_protocol.md``
v1.0.0.
Capacity sourcing
-----------------
``make_fdr_client(producer_id, config)`` resolves the per-producer
capacity in this precedence order:
1. ``config.fdr.per_producer_capacity[producer_id]`` if present.
2. ``config.fdr.queue_size`` (the documented cross-cutting default).
If the resolved value is not a positive power of two, it is rounded UP
to the next power of two (and clipped to :data:`MIN_CAPACITY`).
"""
from __future__ import annotations
from collections.abc import Callable
from typing import Final
from gps_denied_onboard.config import Config
from gps_denied_onboard.fdr_client.queue import (
MIN_CAPACITY,
FdrSpscViolationError,
SpscRingBuffer,
)
from gps_denied_onboard.fdr_client.records import FdrRecord
from gps_denied_onboard.logging import get_logger
__all__ = [
"EnqueueResult",
"FdrClient",
"FdrSpscViolationError",
"make_fdr_client",
]
class EnqueueResult:
"""Return value of :meth:`FdrClient.enqueue`.
String enum (not :class:`enum.Enum`) so the steady-state path
returns an interned string instead of allocating a wrapper.
"""
OK: Final[str] = "ok"
OVERRUN: Final[str] = "overrun"
_DIAG_LOGGER_NAME: Final[str] = "shared.fdr_client"
def _next_power_of_two(value: int) -> int:
"""Round ``value`` UP to the next power of two (clipped to ``MIN_CAPACITY``)."""
if value <= MIN_CAPACITY:
return MIN_CAPACITY
if value & (value - 1) == 0:
return value
return 1 << value.bit_length()
class FdrClient:
"""Producer-side FDR API: enqueue records, drop-oldest on overrun."""
"""Producer-side FDR queue handle for a single component."""
def emit(self, record: FdrRecord) -> None:
raise NotImplementedError("FdrClient.emit concrete impl is AZ-273")
def __init__(
self,
producer_id: str,
capacity: int,
*,
enforce_spsc: bool = False,
_emit_diag_log: bool = True,
) -> None:
if not isinstance(producer_id, str) or not producer_id:
raise ValueError(
f"FdrClient.producer_id must be a non-empty string; got {producer_id!r}"
)
normalised_capacity = _next_power_of_two(capacity)
self._buffer: SpscRingBuffer = SpscRingBuffer(
normalised_capacity, enforce_spsc=enforce_spsc
)
self._producer_id: str = producer_id
self._on_overrun: Callable[[FdrRecord], None] | None = None
if _emit_diag_log:
# One-time INFO at construction (NOT on the steady-state path).
get_logger(_DIAG_LOGGER_NAME).info(
"FdrClient constructed",
extra={
"component": _DIAG_LOGGER_NAME,
"kind": "fdr.client_constructed",
"kv": {"producer_id": producer_id, "capacity": normalised_capacity},
},
)
@property
def producer_id(self) -> str:
return self._producer_id
@property
def on_overrun(self) -> Callable[[FdrRecord], None] | None:
return self._on_overrun
@on_overrun.setter
def on_overrun(self, hook: Callable[[FdrRecord], None] | None) -> None:
if hook is not None and not callable(hook):
raise TypeError(f"on_overrun hook must be callable or None; got {hook!r}")
self._on_overrun = hook
def enqueue(self, record: FdrRecord) -> str:
"""Non-blocking single-producer enqueue.
Returns :attr:`EnqueueResult.OK` on success or
:attr:`EnqueueResult.OVERRUN` when the buffer is full.
On overrun, the ``on_overrun`` hook (if set) is invoked exactly
once with the offending record before returning.
"""
ok = self._buffer.push(record)
if ok:
return EnqueueResult.OK
hook = self._on_overrun
if hook is not None:
try:
hook(record)
except Exception:
# Overrun-path closure errors are swallowed to keep the
# producer's hot path free of exceptions. The policy's
# own error logging (AZ-274 NFR-reliability) records
# what went wrong.
pass
return EnqueueResult.OVERRUN
def pop_one(self) -> FdrRecord | None:
"""Single-consumer dequeue. Returns ``None`` when the buffer is empty."""
return self._buffer.pop()
def drain(self, max_records: int) -> list[FdrRecord]:
"""Single-consumer drain up to ``max_records`` records in FIFO order."""
return self._buffer.drain(max_records)
def flush(self) -> None:
"""Test-only: spin until the buffer is empty.
Production code MUST NOT call this on the hot path.
"""
while not self._buffer.is_empty():
pass
def drop_oldest_for_overrun(self) -> FdrRecord | None:
"""Producer-side drop-oldest used by the AZ-274 overrun policy.
Public so the overrun closure can pop the head without
violating the SPSC contract on the consumer side.
"""
return self._buffer.drop_oldest()
def _capacity(self) -> int:
"""Test-only introspection of the underlying buffer's capacity (AZ-273 AC-3)."""
return self._buffer.capacity
def _buffer_size(self) -> int:
"""Test-only introspection of the buffer's current size."""
return self._buffer.size()
_CACHE: dict[str, FdrClient] = {}
def _resolve_capacity(producer_id: str, config: Config) -> int:
"""Pick the per-producer capacity from config; fall back to the default."""
per_producer = getattr(config.fdr, "per_producer_capacity", {}) or {}
override = per_producer.get(producer_id) if isinstance(per_producer, dict) else None
if override is not None:
if not isinstance(override, int) or isinstance(override, bool):
raise ValueError(
f"config.fdr.per_producer_capacity[{producer_id!r}] must be a "
f"non-bool int; got {override!r}"
)
if override <= 0:
raise ValueError(
f"config.fdr.per_producer_capacity[{producer_id!r}] must be > 0; got {override}"
)
return override
return config.fdr.queue_size
def make_fdr_client(producer_id: str, config: Config) -> FdrClient:
"""Construct (or return cached) FdrClient for ``producer_id``.
Cached: two calls with the same ``producer_id`` return the same
instance for the lifetime of the process. ``_reset_for_tests()``
clears the cache.
"""
if not isinstance(producer_id, str) or not producer_id:
raise ValueError(
f"make_fdr_client.producer_id must be a non-empty string; got {producer_id!r}"
)
cached = _CACHE.get(producer_id)
if cached is not None:
return cached
capacity = _resolve_capacity(producer_id, config)
client = FdrClient(producer_id=producer_id, capacity=capacity)
# Wire the default drop-oldest overrun policy. Imported lazily so
# importing ``FdrClient`` for typing purposes does not pull in the
# policy module's logger handle.
from gps_denied_onboard.fdr_client.overrun_policy import default_overrun_policy
client.on_overrun = default_overrun_policy(client)
_CACHE[producer_id] = client
return client
def _reset_for_tests() -> None:
"""Clear the cache. Documented test-only entry per contract Non-Goals."""
_CACHE.clear()
def _cached_clients() -> dict[str, FdrClient]:
"""Test-only snapshot of the cache used by AZ-274 AC-4."""
return dict(_CACHE)
+173
View File
@@ -0,0 +1,173 @@
"""``FakeFdrSink`` test double for ``FdrClient`` (AZ-275 / E-CC-FDR-CLIENT).
Drop-in replacement that conforms to ``fdr_client_protocol`` v1.0.0's
public surface. Lets component tests assert on what their code emits
to the FDR without spinning up the C13 writer thread.
Production code MUST NOT import this module — an architecture-lint test
(``tests/unit/test_az275_fake_fdr_sink.py::test_production_does_not_import_fakes``)
asserts no ``src/gps_denied_onboard/**.py`` imports from this module.
"""
from __future__ import annotations
from collections.abc import Callable
from typing import Final
from gps_denied_onboard.fdr_client.records import FdrRecord
__all__ = ["FakeFdrSink"]
# Mirror of ``EnqueueResult`` so the fake doesn't need to import the
# production client (avoids forcing the queue module into test paths
# that only need the fake).
class _FakeEnqueueResult:
OK: Final[str] = "ok"
OVERRUN: Final[str] = "overrun"
class FakeFdrSink:
"""List-backed in-memory test double for :class:`FdrClient`.
Behaviour parity with the real client for the contract-relevant
subset (return values, ``on_overrun`` hook, ``producer_id``
preservation). Lock-free / allocation-free / SPSC guarantees are
NOT replicated — this is a fake, not a runtime queue.
"""
def __init__(
self,
producer_id: str,
capacity: int | None = None,
*,
with_default_overrun_policy: bool = False,
) -> None:
if not isinstance(producer_id, str) or not producer_id:
raise ValueError(
f"FakeFdrSink.producer_id must be a non-empty string; got {producer_id!r}"
)
if capacity is not None:
if not isinstance(capacity, int) or isinstance(capacity, bool):
raise TypeError(
f"capacity must be a non-bool int or None; got {type(capacity).__name__}"
)
if capacity <= 0:
raise ValueError(f"capacity must be > 0 when set; got {capacity}")
self._producer_id: str = producer_id
self._capacity: int | None = capacity
self._buffer: list[FdrRecord] = []
self._all: list[FdrRecord] = []
self._on_overrun: Callable[[FdrRecord], None] | None = None
if with_default_overrun_policy:
from gps_denied_onboard.fdr_client.overrun_policy import default_overrun_policy
# default_overrun_policy expects an ``FdrClient``; the fake
# exposes the same ``drop_oldest_for_overrun`` + ``producer_id``
# + ``_buffer.push`` surface, so a duck-typed adapter works.
self._on_overrun = default_overrun_policy(self._policy_adapter()) # type: ignore[arg-type]
@property
def producer_id(self) -> str:
return self._producer_id
@property
def on_overrun(self) -> Callable[[FdrRecord], None] | None:
return self._on_overrun
@on_overrun.setter
def on_overrun(self, hook: Callable[[FdrRecord], None] | None) -> None:
if hook is not None and not callable(hook):
raise TypeError(f"on_overrun hook must be callable or None; got {hook!r}")
self._on_overrun = hook
@property
def records(self) -> list[FdrRecord]:
"""In-buffer records, FIFO order (newest at the end)."""
return list(self._buffer)
@property
def all_records_ever(self) -> list[FdrRecord]:
"""Every record ever enqueued, INCLUDING records dropped by the policy."""
return list(self._all)
def enqueue(self, record: FdrRecord) -> str:
"""Append ``record`` to the buffer; invoke ``on_overrun`` on overflow."""
self._all.append(record)
if self._capacity is None or len(self._buffer) < self._capacity:
self._buffer.append(record)
return _FakeEnqueueResult.OK
hook = self._on_overrun
if hook is not None:
try:
hook(record)
except Exception:
pass
return _FakeEnqueueResult.OVERRUN
def pop_one(self) -> FdrRecord | None:
if not self._buffer:
return None
return self._buffer.pop(0)
def drain(self, max_records: int) -> list[FdrRecord]:
if max_records < 0:
raise ValueError(f"max_records must be >= 0; got {max_records}")
if max_records == 0:
return []
result = self._buffer[:max_records]
del self._buffer[:max_records]
return result
def flush(self) -> None:
self._buffer.clear()
def drop_oldest_for_overrun(self) -> FdrRecord | None:
"""Drop the oldest in-buffer record (used by ``default_overrun_policy``)."""
if not self._buffer:
return None
return self._buffer.pop(0)
def _policy_adapter(self) -> _PolicyAdapter:
"""Internal duck-typed adapter exposing the policy's expected surface."""
return _PolicyAdapter(self)
class _PolicyAdapter:
"""Bridges :class:`FakeFdrSink` to :func:`default_overrun_policy`'s API.
The production policy expects to call ``client.drop_oldest_for_overrun()``
and ``client._buffer.push(record)``. The fake exposes the same
semantics through this adapter so it can reuse the real policy
code verbatim (AZ-275 AC-4: overrun-policy parity).
"""
def __init__(self, sink: FakeFdrSink) -> None:
self._sink = sink
@property
def producer_id(self) -> str:
return self._sink.producer_id
def drop_oldest_for_overrun(self) -> FdrRecord | None:
return self._sink.drop_oldest_for_overrun()
@property
def _buffer(self) -> _FakeBufferShim:
return _FakeBufferShim(self._sink)
class _FakeBufferShim:
"""Shim that lets the policy call ``client._buffer.push(record)``."""
def __init__(self, sink: FakeFdrSink) -> None:
self._sink = sink
def push(self, record: FdrRecord) -> bool:
if self._sink._capacity is None or len(self._sink._buffer) < self._sink._capacity:
self._sink._buffer.append(record)
self._sink._all.append(record)
return True
return False
@@ -0,0 +1,185 @@
"""Drop-oldest + ``kind="overrun"`` emission policy (AZ-274 / E-CC-FDR-CLIENT).
Plugs into :attr:`FdrClient.on_overrun` (the contract's documented hook).
The closure:
1. Pops the oldest record from the buffer to make room (drop-oldest).
2. Retries the user's record once.
3. Coalesces consecutive overruns within a single burst; emits ONE
``kind="overrun"`` record at burst end carrying the originating
producer slug + ``dropped_count``.
4. ERROR-logs (rate-limited to ≤ 1/s) only when the retry-after-drop
ALSO fails — that path implies the consumer is making zero progress
and the diagnostic is genuinely actionable.
Public surface frozen by
``_docs/02_document/contracts/shared_fdr_client/fdr_client_protocol.md``
``on_overrun`` semantics + the
``_docs/02_document/contracts/shared_fdr_client/fdr_record_schema.md``
overrun payload shape.
"""
from __future__ import annotations
import datetime as _dt
import time
from collections.abc import Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Final
from gps_denied_onboard.fdr_client.records import (
CURRENT_SCHEMA_VERSION,
OVERRUN_KIND,
OVERRUN_PRODUCER_ID,
FdrRecord,
)
from gps_denied_onboard.logging import get_logger
if TYPE_CHECKING:
from gps_denied_onboard.fdr_client.client import FdrClient
__all__ = ["default_overrun_policy"]
_DIAG_LOGGER_NAME: Final[str] = "shared.fdr_client.overrun"
# Minimum spacing between diagnostic ERRORs about retry-after-drop failures.
# AZ-274 AC-6: "≤ 1 ERROR/sec per FdrClient on sustained overruns".
_ERROR_LOG_MIN_INTERVAL_S: Final[float] = 1.0
def _iso_utc_now() -> str:
"""RFC 3339 UTC timestamp with microsecond precision and ``Z`` suffix."""
now = _dt.datetime.now(_dt.timezone.utc)
return now.strftime("%Y-%m-%dT%H:%M:%S.") + f"{now.microsecond:06d}Z"
@dataclass
class _BurstState:
"""Coalescing state for a single in-flight overrun burst."""
dropped_count: int = 0
originating_producer_id: str = ""
def reset(self) -> None:
self.dropped_count = 0
self.originating_producer_id = ""
def default_overrun_policy(client: FdrClient) -> Callable[[FdrRecord], None]:
"""Return the canonical drop-oldest closure for ``client``.
Wire into ``client.on_overrun``. Called by :meth:`FdrClient.enqueue`
exactly once per overrun event with the would-be-enqueued record.
"""
burst = _BurstState()
last_error_log_t: list[float] = [0.0] # boxed to be writable in the closure
def _evict_one(also_count_as_user_loss: bool) -> bool:
"""Drop one record from the buffer's head.
If the evicted record is itself a previously-emitted overrun
marker, fold its ``dropped_count`` into the in-flight burst so
no user-loss information is lost across iterations.
``also_count_as_user_loss=True`` adds 1 to ``burst.dropped_count``
if the evicted record is a user record. Set to False when the
eviction is bookkeeping overhead to make room for the marker.
Returns False iff the buffer was empty.
"""
dropped = client.drop_oldest_for_overrun()
if dropped is None:
return False
burst.originating_producer_id = client.producer_id
if isinstance(dropped, FdrRecord) and dropped.kind == OVERRUN_KIND:
inner_dc = dropped.payload.get("dropped_count")
if isinstance(inner_dc, int) and inner_dc > 0:
burst.dropped_count += inner_dc
elif also_count_as_user_loss:
burst.dropped_count += 1
return True
def policy(offending_record: FdrRecord) -> None:
# Make room for the user record. ``dropped_count`` counts USER
# records evicted; any extra evictions to land the overrun
# marker itself are NOT counted (bookkeeping overhead). When
# an evicted record is a prior marker, its count is folded
# into ``burst.dropped_count`` to preserve information.
evicted_for_user = _evict_one(also_count_as_user_loss=True)
# One retry only. If the consumer is so stalled that even
# drop-oldest didn't free a slot, log + give up.
retry_ok = client._buffer.push(offending_record)
if not retry_ok:
_log_retry_failure(client.producer_id, last_error_log_t)
burst.reset()
return
if not evicted_for_user or burst.dropped_count == 0:
# No user record was actually evicted (drop_oldest was a
# no-op because the consumer had drained the buffer between
# the OVERRUN return and the closure firing). Nothing to
# record — the retry already landed the user's record.
burst.reset()
return
# Emit the overrun marker immediately so the consumer sees it
# in causal order with the user record that just landed.
overrun_record = _build_overrun_record(
originating_producer_id=burst.originating_producer_id,
dropped_count=burst.dropped_count,
)
slipped = client._buffer.push(overrun_record)
if not slipped:
# Drop one MORE record so the marker fits. Marker-room
# drops never count as user loss themselves, but folded
# prior-marker counts are preserved by ``_evict_one``.
if not _evict_one(also_count_as_user_loss=False):
_log_retry_failure(client.producer_id, last_error_log_t)
burst.reset()
return
# Re-build because ``burst.dropped_count`` may have grown
# after folding a prior marker's count.
overrun_record = _build_overrun_record(
originating_producer_id=burst.originating_producer_id,
dropped_count=burst.dropped_count,
)
slipped = client._buffer.push(overrun_record)
if not slipped:
_log_retry_failure(client.producer_id, last_error_log_t)
burst.reset()
return
burst.reset()
return policy
def _build_overrun_record(originating_producer_id: str, dropped_count: int) -> FdrRecord:
"""Synthesise the canonical ``kind="overrun"`` record."""
return FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_utc_now(),
producer_id=OVERRUN_PRODUCER_ID,
kind=OVERRUN_KIND,
payload={
"producer_id": originating_producer_id,
"dropped_count": dropped_count,
},
)
def _log_retry_failure(producer_id: str, last_t: list[float]) -> None:
"""Emit at most one ERROR per ``_ERROR_LOG_MIN_INTERVAL_S`` per client."""
now = time.monotonic()
if now - last_t[0] < _ERROR_LOG_MIN_INTERVAL_S:
return
last_t[0] = now
get_logger(_DIAG_LOGGER_NAME).error(
"FDR overrun retry-after-drop failed",
extra={
"component": _DIAG_LOGGER_NAME,
"kind": "fdr.overrun_retry_failed",
"kv": {"producer_id": producer_id},
},
)
+173 -8
View File
@@ -1,21 +1,186 @@
"""Lock-free SPSC ring buffer — STUB.
"""Lock-free SPSC ring buffer (AZ-273 / E-CC-FDR-CLIENT).
Concrete drop-oldest-on-overrun ring buffer is owned by AZ-273.
Implements the producer-side queue for ``FdrClient``. Capacity is fixed at
construction, rounded UP to the next power of two so the wrap-around math
collapses to a bitwise AND.
Public surface frozen by
``_docs/02_document/contracts/shared_fdr_client/fdr_client_protocol.md``
v1.0.0.
Concurrency contract (SPSC):
* ONE producer thread MAY call :meth:`push` (lock-free under CPython GIL).
* ONE consumer thread MAY call :meth:`pop` / :meth:`drain` (lock-free).
* The producer thread MAY call :meth:`drop_oldest` from its on-overrun
closure; this method serialises with concurrent calls from itself via
an internal cold-path lock, but does NOT block the consumer's
:meth:`pop`. A simultaneous producer-drop + consumer-pop is the
documented race window: at worst ``dropped_count`` is off by one for
that one record. Tests run single-threaded around the overrun path so
the AC-1 / AC-2 / AC-5 of AZ-274 are deterministic.
The SPSC enforcement guard is opt-in (``enforce_spsc=True``); production
clients leave it off so the steady-state has zero monitoring overhead.
"""
from __future__ import annotations
from typing import Any
import threading
from typing import Any, Final
__all__ = [
"MIN_CAPACITY",
"FdrSpscViolationError",
"SpscRingBuffer",
]
# Minimum buffer slot count. Below this the contract's wrap-around math
# is degenerate and microbench results stop being representative.
MIN_CAPACITY: Final[int] = 16
class FdrSpscViolationError(RuntimeError):
"""Raised by the opt-in SPSC guard when more than one thread touches a side.
Attributes:
side: ``"producer"`` or ``"consumer"`` — which side was violated.
owner_thread_id: thread that first claimed the side.
offender_thread_id: thread that attempted the violating call.
"""
def __init__(self, side: str, owner_thread_id: int, offender_thread_id: int) -> None:
self.side: str = side
self.owner_thread_id: int = owner_thread_id
self.offender_thread_id: int = offender_thread_id
super().__init__(
f"SPSC violation on {side!r} side: thread {offender_thread_id} attempted "
f"to operate a buffer owned by thread {owner_thread_id}"
)
class SpscRingBuffer:
"""Single-producer single-consumer lock-free ring buffer."""
"""Single-producer single-consumer lock-free ring buffer.
def __init__(self, capacity: int) -> None:
self.capacity = capacity
Storage is a fixed-size Python ``list``; slots are pre-populated with
``None``. The producer writes to ``_slots[_tail]`` then advances
``_tail``; the consumer reads from ``_slots[_head]`` then advances
``_head``. Under CPython's GIL each single attribute store is atomic,
so ``_head`` and ``_tail`` are safe to read across threads.
Capacity is rounded up to a power of two so the modular arithmetic
can use a bitwise AND on ``capacity - 1``.
"""
def __init__(self, capacity: int, *, enforce_spsc: bool = False) -> None:
if not isinstance(capacity, int) or isinstance(capacity, bool):
raise TypeError(f"capacity must be a non-bool int; got {type(capacity).__name__}")
if capacity < MIN_CAPACITY:
raise ValueError(f"capacity must be >= {MIN_CAPACITY}; got {capacity}")
if capacity & (capacity - 1) != 0:
raise ValueError(f"capacity must be a power of two; got {capacity}")
self._capacity: int = capacity
self._mask: int = capacity - 1
self._slots: list[Any] = [None] * capacity
self._head: int = 0
self._tail: int = 0
self._enforce_spsc: bool = enforce_spsc
self._producer_thread_id: int | None = None
self._consumer_thread_id: int | None = None
# Cold-path lock used only inside drop_oldest to serialise multiple
# producer-side overrun events; the consumer's pop intentionally
# does NOT acquire it (see module docstring race window note).
self._drop_lock: threading.Lock = threading.Lock()
@property
def capacity(self) -> int:
return self._capacity
def is_empty(self) -> bool:
return self._head == self._tail
def is_full(self) -> bool:
return ((self._tail + 1) & self._mask) == self._head
def size(self) -> int:
return (self._tail - self._head) & self._mask
def push(self, item: Any) -> bool:
raise NotImplementedError("FdrClient ring-buffer concrete impl is AZ-273")
"""Producer push. Returns ``False`` when the buffer is full."""
if self._enforce_spsc:
self._claim_side(producer=True)
tail = self._tail
next_tail = (tail + 1) & self._mask
if next_tail == self._head:
return False
self._slots[tail] = item
self._tail = next_tail
return True
def pop(self) -> Any | None:
raise NotImplementedError("FdrClient ring-buffer concrete impl is AZ-273")
"""Consumer pop. Returns ``None`` when the buffer is empty."""
if self._enforce_spsc:
self._claim_side(producer=False)
head = self._head
if head == self._tail:
return None
item = self._slots[head]
self._slots[head] = None
self._head = (head + 1) & self._mask
return item
def drop_oldest(self) -> Any | None:
"""Producer-side drop of the oldest queued record (cold path).
Returns the dropped item, or ``None`` if the buffer is empty.
Serialised against concurrent calls from itself via
``_drop_lock``. NOT serialised against the consumer's
:meth:`pop` — see the module docstring's race window note.
"""
with self._drop_lock:
head = self._head
if head == self._tail:
return None
item = self._slots[head]
self._slots[head] = None
self._head = (head + 1) & self._mask
return item
def drain(self, max_records: int) -> list[Any]:
"""Consumer drain up to ``max_records`` items in FIFO order."""
if not isinstance(max_records, int) or isinstance(max_records, bool):
raise TypeError(f"max_records must be a non-bool int; got {type(max_records).__name__}")
if max_records < 0:
raise ValueError(f"max_records must be >= 0; got {max_records}")
result: list[Any] = []
for _ in range(max_records):
item = self.pop()
if item is None:
break
result.append(item)
return result
def _claim_side(self, *, producer: bool) -> None:
"""SPSC guard: bind a side to the first thread that touches it."""
current = threading.get_ident()
if producer:
owner = self._producer_thread_id
if owner is None:
self._producer_thread_id = current
return
if owner != current:
raise FdrSpscViolationError(
side="producer", owner_thread_id=owner, offender_thread_id=current
)
else:
owner = self._consumer_thread_id
if owner is None:
self._consumer_thread_id = current
return
if owner != current:
raise FdrSpscViolationError(
side="consumer", owner_thread_id=owner, offender_thread_id=current
)
+9 -2
View File
@@ -1,8 +1,15 @@
"""Structured JSON logging entrypoint (E-CC-LOG / AZ-245 / AZ-266).
Public surface — every component imports `get_logger` from here. The
handler topology is selected by `configure_logging(tier=...)` at the
Public surface — every component imports ``get_logger`` from here. The
handler topology is selected by ``configure_logging(tier=...)`` at the
composition-root entrypoint.
The FDR log bridge (AZ-267) is wiring code used only by the composition
root. It is intentionally NOT re-exported here: importing it would
trigger a circular import (the bridge depends on ``fdr_client.client``,
which itself logs via ``get_logger``). Composition-root code imports
``from gps_denied_onboard.logging.fdr_bridge import wire_log_bridge``
explicitly.
"""
from gps_denied_onboard.logging.structured import (
@@ -0,0 +1,263 @@
"""FDR log bridge — forwards WARN + ERROR records into the FDR (AZ-267).
A :class:`logging.Handler` subclass installed on the root onboard logger
(or each named logger). For every emitted record at level WARN or
ERROR, it builds a ``kind="log"`` :class:`FdrRecord` carrying the
schema-mandated 7 fields and enqueues into a producer-side
:class:`FdrClient`. INFO and DEBUG records are dropped at the handler's
level filter — they never reach the FDR (AC-3).
Public surface frozen by
``_docs/02_document/contracts/shared_logging/log_record_schema.md``
v1.0.0 (the wire shape of the resulting record's payload) and
``_docs/02_document/contracts/shared_fdr_client/fdr_client_protocol.md``
v1.0.0 (the queue we enqueue into).
Concurrency
-----------
* The handler may run on any logger thread.
* A thread-local recursion guard short-circuits any logging call that
originates from inside the handler itself — without this guard, a
failure path that emits a diagnostic WARN would recurse through the
same handler indefinitely.
* Saturated-queue diagnostics throttle to 1-per-``_DROP_LOG_EVERY_N``
occurrences via stdout (not via the bridge) to avoid the same loop.
"""
from __future__ import annotations
import datetime as _dt
import logging
import sys
import threading
from collections.abc import Callable
from typing import Any, Final
from gps_denied_onboard.fdr_client.client import EnqueueResult
from gps_denied_onboard.fdr_client.records import (
CURRENT_SCHEMA_VERSION,
FdrRecord,
)
# The resolver's return value must expose ``enqueue(record) -> str``; both
# the production ``FdrClient`` and the test ``FakeFdrSink`` satisfy that.
# Typed as ``Any`` here to avoid pulling FakeFdrSink (a tests-only fake)
# into the production module's type contract.
_FdrLikeResolver = Callable[[str], Any]
__all__ = [
"FdrLogBridgeHandler",
"wire_log_bridge",
]
_BRIDGE_HANDLER_MARKER_ATTR: Final[str] = "_gps_denied_fdr_bridge_handler"
# Throttle the stdout "queue saturated" diagnostic to one in every N
# occurrences per NFR-reliability ("at least N>=1000").
_DROP_LOG_EVERY_N: Final[int] = 1000
# Level-name normalisation matching the schema contract (``WARNING`` ->
# ``WARN``). Mirrors the logic in ``logging.structured._normalise_level``;
# duplicated locally to keep this module self-contained and avoid a
# circular import.
def _normalise_level(stdlib_levelname: str) -> str:
if stdlib_levelname == "WARNING":
return "WARN"
return stdlib_levelname
def _iso_utc(created_epoch: float) -> str:
"""RFC 3339 UTC with microsecond precision + ``Z`` suffix."""
dt = _dt.datetime.fromtimestamp(created_epoch, tz=_dt.timezone.utc)
return dt.strftime("%Y-%m-%dT%H:%M:%S.") + f"{dt.microsecond:06d}Z"
# Reserved stdlib LogRecord attributes that must not leak into the FDR
# record's payload.kv. Mirrors the same list in
# ``logging.structured._RESERVED_LOG_RECORD_KEYS``.
_RESERVED_LOG_RECORD_KEYS: Final[frozenset[str]] = frozenset(
{
"args",
"asctime",
"created",
"exc_info",
"exc_text",
"filename",
"funcName",
"levelname",
"levelno",
"lineno",
"message",
"module",
"msecs",
"msg",
"name",
"pathname",
"process",
"processName",
"relativeCreated",
"stack_info",
"taskName",
"thread",
"threadName",
"frame_id",
"kind",
"kv",
"component",
}
)
def _coerce_jsonable(value: Any) -> Any:
"""Mirror of the formatter's coercer; raises on non-JSON-safe types."""
if isinstance(value, (str, int, float, bool)) or value is None:
return value
if isinstance(value, dict):
return {str(k): _coerce_jsonable(v) for k, v in value.items()}
if isinstance(value, (list, tuple)):
return [_coerce_jsonable(v) for v in value]
raise TypeError(f"unserialisable kv payload type: {type(value).__name__}")
class FdrLogBridgeHandler(logging.Handler):
"""Forwards WARN + ERROR :class:`logging.LogRecord` into the FDR.
Constructed with a callable that resolves to the per-record
:class:`FdrClient`. The callable lets the composition root inject
either a per-component sink (one FdrClient per component) or a
shared "log fan-out" client; either way the bridge stays decoupled
from the registry.
"""
_recursion_local = threading.local()
def __init__(
self,
fdr_client_resolver: _FdrLikeResolver,
*,
level: int = logging.WARNING,
) -> None:
super().__init__(level=level)
if not callable(fdr_client_resolver):
raise TypeError(f"fdr_client_resolver must be callable; got {fdr_client_resolver!r}")
self._resolver: _FdrLikeResolver = fdr_client_resolver
self._drop_counter: int = 0
self._drop_lock: threading.Lock = threading.Lock()
setattr(self, _BRIDGE_HANDLER_MARKER_ATTR, True)
def emit(self, record: logging.LogRecord) -> None:
if getattr(self._recursion_local, "in_bridge", False):
# Short-circuit: a log call originating from inside the
# bridge (saturated-queue diagnostic, etc.) must NOT loop.
return
self._recursion_local.in_bridge = True
try:
self._emit_unguarded(record)
finally:
self._recursion_local.in_bridge = False
def _emit_unguarded(self, record: logging.LogRecord) -> None:
try:
fdr_record = self._build_fdr_record(record)
except Exception as exc:
# Translation failed (e.g. non-serialisable kv that even the
# formatter's fallback didn't catch). Skip this record;
# never raise into the caller.
self._note_drop(reason=f"translate_error: {exc}")
return
component = fdr_record.payload.get("component") or record.name
try:
client = self._resolver(component if isinstance(component, str) else record.name)
except Exception as exc:
self._note_drop(reason=f"resolve_error: {exc}")
return
result = client.enqueue(fdr_record)
if result == EnqueueResult.OVERRUN:
self._note_drop(reason="queue_saturated")
def _build_fdr_record(self, record: logging.LogRecord) -> FdrRecord:
rec_dict = record.__dict__
component = rec_dict.get("component") or record.name
kind = rec_dict.get("kind") or "log.diag"
frame_id = rec_dict.get("frame_id")
explicit_kv = rec_dict.get("kv")
if explicit_kv is None:
kv_raw: dict[str, Any] = {
k: v
for k, v in rec_dict.items()
if k not in _RESERVED_LOG_RECORD_KEYS and not k.startswith("_")
}
else:
kv_raw = dict(explicit_kv)
try:
kv_safe = _coerce_jsonable(kv_raw)
except (TypeError, ValueError) as exc:
kv_safe = {"_format_error": f"{type(exc).__name__}: {exc}"}
exc_text: str | None = None
if record.exc_info:
exc_text = logging.Formatter().formatException(record.exc_info)
return FdrRecord(
schema_version=CURRENT_SCHEMA_VERSION,
ts=_iso_utc(record.created),
producer_id=component,
kind="log",
payload={
"level": _normalise_level(record.levelname),
"component": component,
"frame_id": frame_id,
"kind": kind,
"msg": record.getMessage().replace("\n", " "),
"kv": kv_safe,
"exc": exc_text,
},
)
def _note_drop(self, *, reason: str) -> None:
"""Throttled stdout diagnostic; intentionally bypasses the logger."""
with self._drop_lock:
self._drop_counter += 1
if self._drop_counter % _DROP_LOG_EVERY_N != 1:
return
current = self._drop_counter
# stderr (not stdout) so log capture in tests doesn't confuse
# the bridge's diagnostic with normal application output.
print(
f"FdrLogBridgeHandler: dropped record #{current} (reason={reason})",
file=sys.stderr,
)
def wire_log_bridge(
fdr_client_resolver: _FdrLikeResolver,
*,
target_loggers: tuple[str, ...] = ("",),
level: int = logging.WARNING,
) -> FdrLogBridgeHandler:
"""Install a single :class:`FdrLogBridgeHandler` on ``target_loggers``.
Idempotent: re-calling replaces any prior bridge handler on the
same logger(s) — exactly one bridge per logger (AC-5).
Returns the installed handler so the composition root can keep a
handle for teardown (e.g. test isolation).
"""
handler = FdrLogBridgeHandler(fdr_client_resolver, level=level)
for name in target_loggers:
target = logging.getLogger(name)
target.handlers = [
h for h in target.handlers if not getattr(h, _BRIDGE_HANDLER_MARKER_ATTR, False)
]
target.addHandler(handler)
# Lower the logger's effective level if it is currently above WARN.
if target.level == logging.NOTSET or target.level > level:
target.setLevel(level)
return handler