mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 23:01:13 +00:00
[AZ-291] [AZ-292] [AZ-293] C13 FDR writer chain (batch 6)
AZ-291 — FileFdrWriter: single writer thread draining every registered FdrClient SPSC ring buffer to per-flight segment files; per-segment size rotation; cross-process fcntl.flock filelock on flight_root; ENOSPC degraded mode with rate-capped ERROR logs and one GCS alert. AZ-292 — FlightHeader/FlightFooter dataclasses + open_flight / close_flight lifecycle methods; four per-flight monotonic counters (records_written, records_dropped_overrun, bytes_written, rollover_count) reported by the footer; flight_id mismatch and close-without-open are typed errors. AZ-293 — CapacityCapPolicy (post-rotation hook): walks the flight directory, drops the oldest CLOSED segment when total > cap (default 64 GiB), emits a kind="segment_rollover" record per drop. Never drops the currently-open segment or segment 0 alone; cap_misconfigured path logs ERROR + GCS alert. No config flag disables emission (C13-ST-01). Schema: bumped fdr_record_schema flight_header / flight_footer payload key sets to match the AZ-292 task spec (effective 1.0.0 -> 1.1.0; no prior producer); KNOWN_PAYLOAD_KEYS updated. Added FdrWriterConfig nested in FdrConfig (segment_size_bytes, batch_size, flight_cap_bytes, debug_log_per_record). Tests: 29 new unit tests (8 AC + 1 invariant per task); full suite 323 passed, 2 pre-existing skips, 0 regressions. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -1,5 +1,26 @@
|
||||
"""C13 FDR Writer component — Public API."""
|
||||
|
||||
from gps_denied_onboard.components.c13_fdr.cap_policy import CapacityCapPolicy
|
||||
from gps_denied_onboard.components.c13_fdr.errors import (
|
||||
FdrAlreadyClosedError,
|
||||
FdrCloseWithoutOpenError,
|
||||
FdrConcurrentWriterError,
|
||||
FdrOpenError,
|
||||
FdrWriterError,
|
||||
)
|
||||
from gps_denied_onboard.components.c13_fdr.headers import FlightFooter, FlightHeader
|
||||
from gps_denied_onboard.components.c13_fdr.interface import FdrWriter
|
||||
from gps_denied_onboard.components.c13_fdr.writer import FileFdrWriter
|
||||
|
||||
__all__ = ["FdrWriter"]
|
||||
__all__ = [
|
||||
"CapacityCapPolicy",
|
||||
"FdrAlreadyClosedError",
|
||||
"FdrCloseWithoutOpenError",
|
||||
"FdrConcurrentWriterError",
|
||||
"FdrOpenError",
|
||||
"FdrWriter",
|
||||
"FdrWriterError",
|
||||
"FileFdrWriter",
|
||||
"FlightFooter",
|
||||
"FlightHeader",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,186 @@
|
||||
"""``CapacityCapPolicy`` — AZ-293 per-flight 64 GiB cap enforcement.
|
||||
|
||||
After every per-segment rotation the writer (AZ-291) invokes this hook
|
||||
with the index of the just-closed segment. The policy walks the flight
|
||||
directory, sums on-disk segment sizes, and unlinks the oldest CLOSED
|
||||
segment until the total falls back under cap.
|
||||
|
||||
For each drop, a ``kind="segment_rollover"`` ``FdrRecord`` is enqueued
|
||||
on the shared FdrClient. The currently-open segment is never dropped,
|
||||
nor is the segment containing ``flight_header`` (segment 0) — if the
|
||||
only droppable segment is segment 0 the policy logs a hard ERROR +
|
||||
GCS alert (``fdr.cap_misconfigured``) and lets the flight continue in
|
||||
degraded mode (operator action required).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from gps_denied_onboard.components.c13_fdr.writer import FileFdrWriter
|
||||
from gps_denied_onboard.fdr_client.client import FdrClient
|
||||
from gps_denied_onboard.fdr_client.records import OVERRUN_PRODUCER_ID, FdrRecord
|
||||
from gps_denied_onboard.logging import get_logger
|
||||
|
||||
__all__ = ["CapacityCapPolicy"]
|
||||
|
||||
_SEGMENT_ROLLOVER_KIND = "segment_rollover"
|
||||
_CAP_MISCONFIG_KIND = "fdr.cap_misconfigured"
|
||||
_CAP_DROP_KIND = "fdr.cap_drop"
|
||||
_CAP_UNLINK_FAILED_KIND = "fdr.cap_unlink_failed"
|
||||
|
||||
|
||||
def _iso_now() -> str:
|
||||
return datetime.now(tz=timezone.utc).isoformat()
|
||||
|
||||
|
||||
def _on_disk_size(path: Path) -> int:
|
||||
try:
|
||||
return path.stat().st_size
|
||||
except OSError:
|
||||
return 0
|
||||
|
||||
|
||||
class CapacityCapPolicy:
|
||||
"""Per-flight directory-size cap (AZ-293 / E-C13)."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cap_bytes: int,
|
||||
fdr_client: FdrClient,
|
||||
gcs_alert: Callable[[str], None],
|
||||
) -> None:
|
||||
if not isinstance(cap_bytes, int) or isinstance(cap_bytes, bool):
|
||||
raise ValueError(f"cap_bytes must be a non-bool int; got {cap_bytes!r}")
|
||||
if cap_bytes < 1024:
|
||||
raise ValueError(
|
||||
f"cap_bytes must be >= 1024 (1 KiB minimum for tests); got {cap_bytes}"
|
||||
)
|
||||
if cap_bytes > 2**40:
|
||||
raise ValueError(f"cap_bytes must be <= {2**40} (1 TiB sanity bound); got {cap_bytes}")
|
||||
self._cap_bytes = cap_bytes
|
||||
self._fdr_client = fdr_client
|
||||
self._gcs_alert = gcs_alert
|
||||
self._alerted_for_misconfig = False
|
||||
self._log = get_logger("c13_fdr.cap_policy")
|
||||
|
||||
@property
|
||||
def cap_bytes(self) -> int:
|
||||
return self._cap_bytes
|
||||
|
||||
def __call__(self, writer: FileFdrWriter, just_rotated_from: int) -> None:
|
||||
"""Post-rotation hook entry-point.
|
||||
|
||||
``just_rotated_from`` is the index of the segment that was open
|
||||
prior to the rotation (it has just been closed + fsync'd).
|
||||
"""
|
||||
del just_rotated_from # only used for diagnostic logs by AZ-291
|
||||
total = self._directory_size(writer)
|
||||
# Per AZ-293 AC-2 loop until under cap.
|
||||
while total > self._cap_bytes:
|
||||
closed = writer.list_closed_segments()
|
||||
if not closed:
|
||||
# The currently-open segment alone is over cap; we
|
||||
# cannot drop it. Fall through to the misconfigured
|
||||
# path.
|
||||
self._cap_misconfigured(writer, total)
|
||||
return
|
||||
# The oldest segment may BE segment 0 — which contains the
|
||||
# flight_header. We never drop segment 0 unless the operator
|
||||
# has explicitly configured ``allow_drop_segment_zero`` (no
|
||||
# such switch exists; AC-6 requires no opt-out). So if the
|
||||
# only droppable segment is 0, we hard-fail.
|
||||
oldest_index, oldest_path = closed[0]
|
||||
if oldest_index == 0 and len(closed) == 1:
|
||||
self._cap_misconfigured(writer, total)
|
||||
return
|
||||
if oldest_index == 0:
|
||||
# There are other segments to drop first. Skip segment 0
|
||||
# and drop the next-oldest instead.
|
||||
oldest_index, oldest_path = closed[1]
|
||||
freed = _on_disk_size(oldest_path)
|
||||
try:
|
||||
oldest_path.unlink()
|
||||
except OSError as exc:
|
||||
self._log.warning(
|
||||
f"{_CAP_UNLINK_FAILED_KIND}: {oldest_path.name} ({exc})",
|
||||
extra={
|
||||
"kind": _CAP_UNLINK_FAILED_KIND,
|
||||
"kv": {"path": str(oldest_path), "error": repr(exc)},
|
||||
},
|
||||
)
|
||||
# Continue to next-oldest so we don't loop forever on a
|
||||
# missing inode (operator interference / Risk 2).
|
||||
total -= freed
|
||||
continue
|
||||
writer.increment_rollover_count_for_cap()
|
||||
total = self._directory_size(writer)
|
||||
self._emit_segment_rollover(
|
||||
old_segment=oldest_index,
|
||||
new_segment=writer.segments_written(),
|
||||
total_bytes_after=total,
|
||||
)
|
||||
self._log.info(
|
||||
f"{_CAP_DROP_KIND}: dropped segment {oldest_index}, total now {total}",
|
||||
extra={
|
||||
"kind": _CAP_DROP_KIND,
|
||||
"kv": {
|
||||
"old_segment": oldest_index,
|
||||
"new_segment": writer.segments_written(),
|
||||
"total_bytes_after": total,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
def _directory_size(self, writer: FileFdrWriter) -> int:
|
||||
closed_size = sum(_on_disk_size(p) for _idx, p in writer.list_closed_segments())
|
||||
return closed_size + writer.current_segment_bytes()
|
||||
|
||||
def _emit_segment_rollover(
|
||||
self, *, old_segment: int, new_segment: int, total_bytes_after: int
|
||||
) -> None:
|
||||
record = FdrRecord(
|
||||
schema_version=1,
|
||||
ts=_iso_now(),
|
||||
producer_id=OVERRUN_PRODUCER_ID,
|
||||
kind=_SEGMENT_ROLLOVER_KIND,
|
||||
payload={
|
||||
"old_segment": old_segment,
|
||||
"new_segment": new_segment,
|
||||
"total_bytes_after": total_bytes_after,
|
||||
},
|
||||
)
|
||||
# Producer-side overrun policy (AZ-274) handles full-buffer
|
||||
# cases — the cap policy itself does not block here.
|
||||
self._fdr_client.enqueue(record)
|
||||
|
||||
def _cap_misconfigured(self, writer: FileFdrWriter, observed_total: int) -> None:
|
||||
self._log.error(
|
||||
f"{_CAP_MISCONFIG_KIND}: cap={self._cap_bytes} bytes but "
|
||||
f"on-disk total={observed_total} bytes and no droppable segment exists",
|
||||
extra={
|
||||
"kind": _CAP_MISCONFIG_KIND,
|
||||
"kv": {
|
||||
"cap_bytes": self._cap_bytes,
|
||||
"observed_total": observed_total,
|
||||
"current_segment_bytes": writer.current_segment_bytes(),
|
||||
},
|
||||
},
|
||||
)
|
||||
if not self._alerted_for_misconfig:
|
||||
self._alerted_for_misconfig = True
|
||||
try:
|
||||
self._gcs_alert(
|
||||
f"FDR cap misconfigured: cap={self._cap_bytes} bytes "
|
||||
f"< current single-segment size; flight degraded"
|
||||
)
|
||||
except Exception as exc:
|
||||
self._log.error(
|
||||
"fdr.gcs_alert_failed",
|
||||
extra={
|
||||
"kind": "fdr.gcs_alert_failed",
|
||||
"kv": {"on": "cap_misconfig", "error": repr(exc)},
|
||||
},
|
||||
)
|
||||
@@ -0,0 +1,41 @@
|
||||
"""C13 FDR writer error types (AZ-291 / AZ-292 / AZ-293)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
__all__ = [
|
||||
"FdrAlreadyClosedError",
|
||||
"FdrCloseWithoutOpenError",
|
||||
"FdrConcurrentWriterError",
|
||||
"FdrOpenError",
|
||||
"FdrWriterError",
|
||||
]
|
||||
|
||||
|
||||
class FdrWriterError(RuntimeError):
|
||||
"""Base class for every C13 writer-side runtime error."""
|
||||
|
||||
|
||||
class FdrConcurrentWriterError(FdrWriterError):
|
||||
"""Raised when a second writer attempts to acquire the flight-root lock.
|
||||
|
||||
The flight-root filelock is held for the entire flight (AZ-291 AC-4);
|
||||
a second writer construction against the same root is a composition-
|
||||
root bug.
|
||||
"""
|
||||
|
||||
|
||||
class FdrOpenError(FdrWriterError):
|
||||
"""Raised when ``open_flight`` cannot land the ``flight_header`` record
|
||||
on disk (AZ-292 AC-4, AC-5) or when the writer cannot open segment 0.
|
||||
|
||||
Translates to a takeoff abort in the composition root (AZ-296 wires
|
||||
that path); this task only raises the exception.
|
||||
"""
|
||||
|
||||
|
||||
class FdrCloseWithoutOpenError(FdrWriterError):
|
||||
"""Raised by ``close_flight`` when ``open_flight`` was never called."""
|
||||
|
||||
|
||||
class FdrAlreadyClosedError(FdrWriterError):
|
||||
"""Raised by ``close_flight`` if called twice (AZ-292 idempotency)."""
|
||||
@@ -0,0 +1,52 @@
|
||||
"""C13 FlightHeader / FlightFooter dataclasses (AZ-292).
|
||||
|
||||
The header is the first record of segment 0; the footer is the last
|
||||
record before clean shutdown. Together they make a per-flight FDR
|
||||
directory self-describing.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
__all__ = ["FlightFooter", "FlightHeader"]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class FlightHeader:
|
||||
"""First-record-on-disk descriptor for a flight (AZ-292 AC-1).
|
||||
|
||||
``config_snapshot`` MUST be JSON-safe — the composition root scrubs
|
||||
known-secret fields (per AZ-269 redacted-config helper) before
|
||||
constructing the header.
|
||||
"""
|
||||
|
||||
flight_id: UUID
|
||||
flight_started_at_iso: str
|
||||
flight_started_at_monotonic_ns: int
|
||||
config_snapshot: dict[str, Any] = field(default_factory=dict)
|
||||
signing_key_rotation_event: dict[str, Any] = field(default_factory=dict)
|
||||
manifest_content_hashes: dict[str, str] = field(default_factory=dict)
|
||||
build_info: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class FlightFooter:
|
||||
"""Last-record-on-disk descriptor for a flight (AZ-292 AC-2).
|
||||
|
||||
The four counters (``records_written``, ``records_dropped_overrun``,
|
||||
``bytes_written``, ``rollover_count``) are the AC-NEW-3 canonical
|
||||
audit trail. ``clean_shutdown`` distinguishes a graceful landing
|
||||
from a power-loss truncation.
|
||||
"""
|
||||
|
||||
flight_id: UUID
|
||||
flight_ended_at_iso: str
|
||||
flight_ended_at_monotonic_ns: int
|
||||
records_written: int
|
||||
records_dropped_overrun: int
|
||||
bytes_written: int
|
||||
rollover_count: int
|
||||
clean_shutdown: bool
|
||||
@@ -0,0 +1,548 @@
|
||||
"""`FileFdrWriter` — single C13 writer thread + segment lifecycle.
|
||||
|
||||
Implements AZ-291 (writer thread + segment files + filelock + atomic
|
||||
rotation + ENOSPC handling) and AZ-292 (flight header/footer + per-flight
|
||||
counters). The cap policy hook (AZ-293) is wired via the
|
||||
``on_rotation`` callback so AZ-291 stays focused on per-segment
|
||||
lifecycle and the policy can be injected by the composition root.
|
||||
|
||||
Single-thread by contract on each side:
|
||||
- Producer side: every registered `FdrClient` has exactly one consumer
|
||||
(this writer's background thread).
|
||||
- Lifecycle side: ``start``/``stop``/``open_flight``/``close_flight``
|
||||
are called once each by the composition root.
|
||||
|
||||
Filelock: cross-process advisory ``fcntl.flock`` on ``flight_root/.fdr.lock``
|
||||
held for the entire flight. POSIX semantics mean the kernel releases
|
||||
the lock on process death automatically (Risk 3).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import errno
|
||||
import fcntl
|
||||
import os
|
||||
import struct
|
||||
import threading
|
||||
import time
|
||||
from collections.abc import Callable, Sequence
|
||||
from dataclasses import asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from uuid import UUID
|
||||
|
||||
from gps_denied_onboard.components.c13_fdr.errors import (
|
||||
FdrAlreadyClosedError,
|
||||
FdrCloseWithoutOpenError,
|
||||
FdrConcurrentWriterError,
|
||||
FdrOpenError,
|
||||
FdrWriterError,
|
||||
)
|
||||
from gps_denied_onboard.components.c13_fdr.headers import FlightFooter, FlightHeader
|
||||
from gps_denied_onboard.config import FdrWriterConfig
|
||||
from gps_denied_onboard.fdr_client.client import FdrClient
|
||||
from gps_denied_onboard.fdr_client.records import (
|
||||
OVERRUN_KIND,
|
||||
OVERRUN_PRODUCER_ID,
|
||||
FdrRecord,
|
||||
serialise,
|
||||
)
|
||||
from gps_denied_onboard.logging import get_logger
|
||||
|
||||
__all__ = ["FileFdrWriter"]
|
||||
|
||||
_FLIGHT_HEADER_KIND = "flight_header"
|
||||
_FLIGHT_FOOTER_KIND = "flight_footer"
|
||||
_LENGTH_PREFIX = struct.Struct("<I") # uint32 LE record length prefix
|
||||
_LOG_FAILURE_RATE_LIMIT_S = 1.0
|
||||
_DEFAULT_DRAIN_SLEEP_S = 0.001
|
||||
|
||||
|
||||
def _iso_now() -> str:
|
||||
return datetime.now(tz=timezone.utc).isoformat()
|
||||
|
||||
|
||||
def _flight_header_to_payload(header: FlightHeader) -> dict:
|
||||
payload = asdict(header)
|
||||
payload["flight_id"] = str(header.flight_id)
|
||||
return payload
|
||||
|
||||
|
||||
def _flight_footer_to_payload(footer: FlightFooter) -> dict:
|
||||
payload = asdict(footer)
|
||||
payload["flight_id"] = str(footer.flight_id)
|
||||
return payload
|
||||
|
||||
|
||||
class FileFdrWriter:
|
||||
"""Single-writer C13 FDR component.
|
||||
|
||||
Constructor binds the writer to a specific ``flight_root`` and a
|
||||
``flight_id``; the same UUID MUST be passed in via the
|
||||
``FlightHeader`` to ``open_flight`` (AZ-292 AC-5).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
flight_root: Path,
|
||||
flight_id: UUID,
|
||||
config: FdrWriterConfig,
|
||||
fdr_clients: Sequence[FdrClient],
|
||||
gcs_alert: Callable[[str], None],
|
||||
*,
|
||||
on_rotation: Callable[[FileFdrWriter, int], None] | None = None,
|
||||
drain_sleep_s: float = _DEFAULT_DRAIN_SLEEP_S,
|
||||
) -> None:
|
||||
self._flight_root = Path(flight_root)
|
||||
self._flight_id = flight_id
|
||||
self._config = config
|
||||
self._fdr_clients = tuple(fdr_clients)
|
||||
self._gcs_alert = gcs_alert
|
||||
self._on_rotation = on_rotation
|
||||
self._drain_sleep_s = drain_sleep_s
|
||||
|
||||
# Filesystem state.
|
||||
self._flight_dir: Path = self._flight_root / str(flight_id)
|
||||
self._lock_path: Path = self._flight_root / ".fdr.lock"
|
||||
self._lock_fd: int | None = None
|
||||
|
||||
# Segment state.
|
||||
self._segment_index: int = 0
|
||||
self._segment_fd: int | None = None
|
||||
self._segment_bytes: int = 0
|
||||
self._is_rolling: bool = False
|
||||
|
||||
# Lifecycle state.
|
||||
self._started = False
|
||||
self._stopped = False
|
||||
self._opened = False
|
||||
self._closed = False
|
||||
self._stop_event = threading.Event()
|
||||
self._thread: threading.Thread | None = None
|
||||
self._exit_code_thrown: BaseException | None = None
|
||||
self._last_failure_log_t: float = 0.0
|
||||
self._is_degraded = False
|
||||
self._gcs_alerted_for_write_failure = False
|
||||
|
||||
# Counters (writer-thread is the sole mutator).
|
||||
self._records_written = 0
|
||||
self._records_dropped_overrun = 0
|
||||
self._bytes_written = 0
|
||||
self._rollover_count = 0
|
||||
self._stored_footer: FlightFooter | None = None
|
||||
|
||||
self._log = get_logger("c13_fdr.writer")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public read-only introspection (AZ-291).
|
||||
|
||||
def current_segment_path(self) -> Path:
|
||||
return self._segment_path(self._segment_index)
|
||||
|
||||
def current_segment_bytes(self) -> int:
|
||||
return self._segment_bytes
|
||||
|
||||
def segments_written(self) -> int:
|
||||
# Number of CLOSED segments. The currently-open one isn't counted
|
||||
# until it rotates or close_flight closes it.
|
||||
return self._segment_index
|
||||
|
||||
def is_rolling(self) -> bool:
|
||||
return self._is_rolling
|
||||
|
||||
def is_degraded(self) -> bool:
|
||||
return self._is_degraded
|
||||
|
||||
def current_size_bytes(self) -> int:
|
||||
return self._bytes_written
|
||||
|
||||
@property
|
||||
def flight_id(self) -> UUID:
|
||||
return self._flight_id
|
||||
|
||||
@property
|
||||
def flight_dir(self) -> Path:
|
||||
return self._flight_dir
|
||||
|
||||
@property
|
||||
def rollover_count(self) -> int:
|
||||
return self._rollover_count
|
||||
|
||||
@property
|
||||
def records_dropped_overrun(self) -> int:
|
||||
return self._records_dropped_overrun
|
||||
|
||||
def list_closed_segments(self) -> list[tuple[int, Path]]:
|
||||
"""Return ``(segment_index, path)`` for every CLOSED segment on disk.
|
||||
|
||||
Used by the cap policy (AZ-293) to decide which segment to drop.
|
||||
The currently-open segment (``self._segment_index``) is excluded
|
||||
from the result regardless of whether its file exists.
|
||||
"""
|
||||
result: list[tuple[int, Path]] = []
|
||||
if not self._flight_dir.exists():
|
||||
return result
|
||||
for entry in self._flight_dir.iterdir():
|
||||
if not entry.is_file():
|
||||
continue
|
||||
name = entry.name
|
||||
if not (name.startswith("segment-") and name.endswith(".fdr")):
|
||||
continue
|
||||
try:
|
||||
index = int(name[len("segment-") : -len(".fdr")])
|
||||
except ValueError:
|
||||
continue
|
||||
if index == self._segment_index:
|
||||
continue
|
||||
result.append((index, entry))
|
||||
result.sort(key=lambda kv: kv[0])
|
||||
return result
|
||||
|
||||
def increment_rollover_count_for_cap(self) -> None:
|
||||
"""Allow the cap policy (AZ-293) to attribute a cap-driven drop to
|
||||
``rollover_count`` so the footer's totals match.
|
||||
"""
|
||||
self._rollover_count += 1
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Lifecycle: start / stop / open_flight / close_flight.
|
||||
|
||||
def start(self) -> None:
|
||||
if self._started:
|
||||
raise FdrWriterError("FileFdrWriter.start called twice")
|
||||
self._started = True
|
||||
|
||||
self._flight_root.mkdir(parents=True, exist_ok=True)
|
||||
self._acquire_filelock()
|
||||
try:
|
||||
self._flight_dir.mkdir(parents=True, exist_ok=True)
|
||||
self._open_segment(self._segment_index)
|
||||
except Exception:
|
||||
self._release_filelock()
|
||||
self._started = False
|
||||
raise
|
||||
|
||||
self._thread = threading.Thread(target=self._writer_loop, name="c13.writer", daemon=True)
|
||||
self._thread.start()
|
||||
self._log.info("c13.writer.start", extra={"kv": {"flight_id": str(self._flight_id)}})
|
||||
|
||||
def stop(self) -> None:
|
||||
if not self._started:
|
||||
return
|
||||
if self._stopped:
|
||||
return
|
||||
self._stopped = True
|
||||
|
||||
self._stop_event.set()
|
||||
if self._thread is not None:
|
||||
self._thread.join(timeout=5.0)
|
||||
|
||||
# Best-effort drain after thread exit (AC-6).
|
||||
self._drain_all(final=True)
|
||||
self._close_segment(fsync=True)
|
||||
self._release_filelock()
|
||||
self._log.info(
|
||||
"c13.writer.stop",
|
||||
extra={
|
||||
"kv": {
|
||||
"records_written": self._records_written,
|
||||
"rollover_count": self._rollover_count,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
def open_flight(self, header: FlightHeader) -> None:
|
||||
if not self._started:
|
||||
raise FdrWriterError("open_flight called before start()")
|
||||
if self._opened:
|
||||
raise FdrWriterError("open_flight called twice")
|
||||
if header.flight_id != self._flight_id:
|
||||
self._fail_open(
|
||||
f"FlightHeader.flight_id ({header.flight_id}) does not match "
|
||||
f"writer's flight_id ({self._flight_id})"
|
||||
)
|
||||
|
||||
record = FdrRecord(
|
||||
schema_version=1,
|
||||
ts=_iso_now(),
|
||||
producer_id=OVERRUN_PRODUCER_ID,
|
||||
kind=_FLIGHT_HEADER_KIND,
|
||||
payload=_flight_header_to_payload(header),
|
||||
)
|
||||
try:
|
||||
self._append_record(record)
|
||||
except OSError as exc:
|
||||
self._fail_open(f"failed to write flight_header: {exc}", cause=exc)
|
||||
|
||||
self._opened = True
|
||||
self._log.info(
|
||||
"fdr.flight_open",
|
||||
extra={"kv": {"flight_id": str(self._flight_id)}},
|
||||
)
|
||||
|
||||
def close_flight(self) -> FlightFooter:
|
||||
if not self._opened:
|
||||
raise FdrCloseWithoutOpenError(
|
||||
"close_flight called without a prior successful open_flight"
|
||||
)
|
||||
if self._closed:
|
||||
if self._stored_footer is not None:
|
||||
return self._stored_footer
|
||||
raise FdrAlreadyClosedError(
|
||||
"close_flight called twice and no stored footer is available"
|
||||
)
|
||||
|
||||
# Drain pending producer records BEFORE emitting the footer so the
|
||||
# counters reflect the entire flight.
|
||||
self._stop_event.set()
|
||||
if self._thread is not None:
|
||||
self._thread.join(timeout=5.0)
|
||||
self._drain_all(final=True)
|
||||
|
||||
# Build a footer whose ``bytes_written`` payload includes the
|
||||
# footer's own framed size. Because ``bytes_written`` is itself
|
||||
# an integer field on the wire, growing it can change the
|
||||
# serialised length by 1 byte at decimal-power boundaries; we
|
||||
# iterate until the value is stable. Practically this converges
|
||||
# in one or two passes.
|
||||
ts = _iso_now()
|
||||
mono_ns = time.monotonic_ns()
|
||||
records_written_now = self._records_written + 1 # +1 for the footer itself
|
||||
bytes_estimate = self._bytes_written
|
||||
footer: FlightFooter | None = None
|
||||
footer_record: FdrRecord | None = None
|
||||
for _ in range(8):
|
||||
footer = FlightFooter(
|
||||
flight_id=self._flight_id,
|
||||
flight_ended_at_iso=ts,
|
||||
flight_ended_at_monotonic_ns=mono_ns,
|
||||
records_written=records_written_now,
|
||||
records_dropped_overrun=self._records_dropped_overrun,
|
||||
bytes_written=bytes_estimate,
|
||||
rollover_count=self._rollover_count,
|
||||
clean_shutdown=True,
|
||||
)
|
||||
footer_record = FdrRecord(
|
||||
schema_version=1,
|
||||
ts=footer.flight_ended_at_iso,
|
||||
producer_id=OVERRUN_PRODUCER_ID,
|
||||
kind=_FLIGHT_FOOTER_KIND,
|
||||
payload=_flight_footer_to_payload(footer),
|
||||
)
|
||||
body = serialise(footer_record)
|
||||
framed_size = _LENGTH_PREFIX.size + len(body)
|
||||
next_estimate = self._bytes_written + framed_size
|
||||
if next_estimate == bytes_estimate:
|
||||
break
|
||||
bytes_estimate = next_estimate
|
||||
assert footer is not None and footer_record is not None
|
||||
try:
|
||||
self._append_record(footer_record)
|
||||
except OSError as exc:
|
||||
self._handle_write_failure(exc)
|
||||
|
||||
self._close_segment(fsync=True)
|
||||
self._release_filelock()
|
||||
self._closed = True
|
||||
self._stopped = True
|
||||
self._stored_footer = footer
|
||||
self._log.info(
|
||||
"fdr.flight_close",
|
||||
extra={
|
||||
"kv": {
|
||||
"records_written": footer.records_written,
|
||||
"records_dropped_overrun": footer.records_dropped_overrun,
|
||||
"bytes_written": footer.bytes_written,
|
||||
"rollover_count": footer.rollover_count,
|
||||
"clean_shutdown": True,
|
||||
}
|
||||
},
|
||||
)
|
||||
return footer
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Writer thread loop.
|
||||
|
||||
def _writer_loop(self) -> None:
|
||||
try:
|
||||
while not self._stop_event.is_set():
|
||||
drained = self._drain_all(final=False)
|
||||
if drained == 0:
|
||||
self._stop_event.wait(timeout=self._drain_sleep_s)
|
||||
except BaseException as exc:
|
||||
self._exit_code_thrown = exc
|
||||
self._handle_write_failure(exc if isinstance(exc, OSError) else OSError(str(exc)))
|
||||
|
||||
def _drain_all(self, *, final: bool) -> int:
|
||||
drained = 0
|
||||
for client in self._fdr_clients:
|
||||
drained += self._drain_one(client, final=final)
|
||||
return drained
|
||||
|
||||
def _drain_one(self, client: FdrClient, *, final: bool) -> int:
|
||||
batch = client.drain(max_records=self._config.batch_size)
|
||||
for record in batch:
|
||||
self._observe_overrun_record(record)
|
||||
try:
|
||||
self._append_record(record)
|
||||
except OSError as exc:
|
||||
self._handle_write_failure(exc)
|
||||
# Continue dequeuing producer buffers so they don't grow
|
||||
# unboundedly even in degraded mode (AC-5 part d).
|
||||
continue
|
||||
return len(batch)
|
||||
|
||||
def _observe_overrun_record(self, record: FdrRecord) -> None:
|
||||
if record.kind != OVERRUN_KIND:
|
||||
return
|
||||
dropped = record.payload.get("dropped_count", 0)
|
||||
if isinstance(dropped, int) and dropped > 0:
|
||||
self._records_dropped_overrun += dropped
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Segment file lifecycle.
|
||||
|
||||
def _segment_path(self, index: int) -> Path:
|
||||
return self._flight_dir / f"segment-{index:04d}.fdr"
|
||||
|
||||
def _open_segment(self, index: int) -> None:
|
||||
path = self._segment_path(index)
|
||||
flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND
|
||||
try:
|
||||
self._segment_fd = os.open(path, flags, 0o644)
|
||||
except OSError as exc:
|
||||
raise FdrOpenError(f"failed to open segment {index} at {path}: {exc}") from exc
|
||||
self._segment_bytes = 0
|
||||
|
||||
def _close_segment(self, *, fsync: bool) -> None:
|
||||
if self._segment_fd is None:
|
||||
return
|
||||
try:
|
||||
if fsync:
|
||||
os.fsync(self._segment_fd)
|
||||
except OSError:
|
||||
# Best-effort fsync on close; the failure is already accounted
|
||||
# for upstream (degraded mode). Continue with close.
|
||||
pass
|
||||
try:
|
||||
os.close(self._segment_fd)
|
||||
finally:
|
||||
self._segment_fd = None
|
||||
|
||||
def _rotate_segment(self) -> None:
|
||||
"""Close the current segment and atomically open the next one."""
|
||||
self._is_rolling = True
|
||||
try:
|
||||
previous = self._segment_index
|
||||
self._close_segment(fsync=True)
|
||||
self._segment_index += 1
|
||||
self._open_segment(self._segment_index)
|
||||
self._rollover_count += 1
|
||||
self._log.info(
|
||||
"c13.writer.segment_rotated",
|
||||
extra={"kv": {"previous": previous, "current": self._segment_index}},
|
||||
)
|
||||
if self._on_rotation is not None:
|
||||
# The cap-policy hook runs synchronously after each rotation
|
||||
# (AZ-293 Risk 3). Exceptions are caught to keep the writer
|
||||
# alive — the policy is responsible for its own error
|
||||
# surfacing.
|
||||
try:
|
||||
self._on_rotation(self, previous)
|
||||
except Exception as exc:
|
||||
self._log.error(
|
||||
"c13.writer.on_rotation_failed",
|
||||
extra={"kv": {"error": repr(exc)}},
|
||||
)
|
||||
finally:
|
||||
self._is_rolling = False
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Record append + length framing.
|
||||
|
||||
def _append_record(self, record: FdrRecord) -> None:
|
||||
if self._segment_fd is None:
|
||||
raise FdrWriterError("append_record called with no open segment")
|
||||
body = serialise(record)
|
||||
frame = _LENGTH_PREFIX.pack(len(body)) + body
|
||||
# Per-segment cap check happens BEFORE write so the rotation
|
||||
# threshold matches AC-2's "≤ cap + one record overshoot"
|
||||
# interpretation: a record that would push past the cap goes into
|
||||
# the NEXT segment.
|
||||
cap = self._config.segment_size_bytes
|
||||
if cap > 0 and self._segment_bytes > 0 and self._segment_bytes + len(frame) > cap:
|
||||
self._rotate_segment()
|
||||
os.write(self._segment_fd, frame)
|
||||
self._segment_bytes += len(frame)
|
||||
self._bytes_written += len(frame)
|
||||
self._records_written += 1
|
||||
if self._config.debug_log_per_record:
|
||||
self._log.debug("c13.writer.append", extra={"kv": {"kind": record.kind}})
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Cross-process filelock.
|
||||
|
||||
def _acquire_filelock(self) -> None:
|
||||
self._flight_root.mkdir(parents=True, exist_ok=True)
|
||||
self._lock_fd = os.open(self._lock_path, os.O_RDWR | os.O_CREAT, 0o644)
|
||||
try:
|
||||
fcntl.flock(self._lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
except OSError as exc:
|
||||
os.close(self._lock_fd)
|
||||
self._lock_fd = None
|
||||
raise FdrConcurrentWriterError(
|
||||
f"another writer holds {self._lock_path}: {exc}"
|
||||
) from exc
|
||||
|
||||
def _release_filelock(self) -> None:
|
||||
if self._lock_fd is None:
|
||||
return
|
||||
try:
|
||||
fcntl.flock(self._lock_fd, fcntl.LOCK_UN)
|
||||
finally:
|
||||
os.close(self._lock_fd)
|
||||
self._lock_fd = None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Failure handling.
|
||||
|
||||
def _fail_open(self, message: str, *, cause: BaseException | None = None) -> None:
|
||||
# An open_flight failure rolls back: close the segment, unlink it
|
||||
# (it can only contain a partially-written header), release the
|
||||
# filelock so the next attempt isn't permanently stuck.
|
||||
seg_path = self._segment_path(self._segment_index)
|
||||
self._close_segment(fsync=False)
|
||||
try:
|
||||
if seg_path.exists() and seg_path.stat().st_size == 0:
|
||||
seg_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
self._release_filelock()
|
||||
self._started = False
|
||||
if cause is None:
|
||||
raise FdrOpenError(message)
|
||||
raise FdrOpenError(message) from cause
|
||||
|
||||
def _handle_write_failure(self, exc: BaseException) -> None:
|
||||
now = time.monotonic()
|
||||
if now - self._last_failure_log_t >= _LOG_FAILURE_RATE_LIMIT_S:
|
||||
self._last_failure_log_t = now
|
||||
errno_value = getattr(exc, "errno", None)
|
||||
self._log.error(
|
||||
"fdr.write_failure",
|
||||
extra={"kv": {"errno": errno_value, "error": repr(exc)}},
|
||||
)
|
||||
if not self._gcs_alerted_for_write_failure:
|
||||
self._gcs_alerted_for_write_failure = True
|
||||
try:
|
||||
self._gcs_alert(
|
||||
"FDR write failure — companion in degraded mode "
|
||||
f"(errno={getattr(exc, 'errno', errno.EIO)})"
|
||||
)
|
||||
except Exception:
|
||||
# GCS alert failure is logged but does not unwind further.
|
||||
self._log.error(
|
||||
"fdr.gcs_alert_failed",
|
||||
extra={"kv": {"on": "write_failure"}},
|
||||
)
|
||||
self._is_degraded = True
|
||||
Reference in New Issue
Block a user