"""`FileFdrWriter` — single C13 writer thread + segment lifecycle. Implements AZ-291 (writer thread + segment files + filelock + atomic rotation + ENOSPC handling) and AZ-292 (flight header/footer + per-flight counters). The cap policy hook (AZ-293) is wired via the ``on_rotation`` callback so AZ-291 stays focused on per-segment lifecycle and the policy can be injected by the composition root. Single-thread by contract on each side: - Producer side: every registered `FdrClient` has exactly one consumer (this writer's background thread). - Lifecycle side: ``start``/``stop``/``open_flight``/``close_flight`` are called once each by the composition root. Filelock: cross-process advisory ``fcntl.flock`` on ``flight_root/.fdr.lock`` held for the entire flight. POSIX semantics mean the kernel releases the lock on process death automatically (Risk 3). """ from __future__ import annotations import errno import fcntl import os import struct import threading import time from collections.abc import Callable, Sequence from dataclasses import asdict from datetime import datetime, timezone from pathlib import Path from typing import TYPE_CHECKING from uuid import UUID from gps_denied_onboard.clock.wall_clock import WallClock from gps_denied_onboard.components.c13_fdr.errors import ( FdrAlreadyClosedError, FdrCloseWithoutOpenError, FdrConcurrentWriterError, FdrOpenError, FdrWriterError, ) from gps_denied_onboard.components.c13_fdr.headers import FlightFooter, FlightHeader from gps_denied_onboard.components.c13_fdr.record_kind_policy import ( GateDecision, RecordKindPolicy, ) from gps_denied_onboard.config import FdrWriterConfig from gps_denied_onboard.fdr_client.client import FdrClient from gps_denied_onboard.fdr_client.records import ( OVERRUN_KIND, OVERRUN_PRODUCER_ID, FdrRecord, serialise, ) from gps_denied_onboard.logging import get_logger if TYPE_CHECKING: from gps_denied_onboard.clock import Clock __all__ = ["FileFdrWriter"] _FLIGHT_HEADER_KIND = "flight_header" _FLIGHT_FOOTER_KIND = "flight_footer" _LENGTH_PREFIX = struct.Struct(" str: return datetime.now(tz=timezone.utc).isoformat() def _flight_header_to_payload(header: FlightHeader) -> dict: payload = asdict(header) payload["flight_id"] = str(header.flight_id) return payload def _flight_footer_to_payload(footer: FlightFooter) -> dict: payload = asdict(footer) payload["flight_id"] = str(footer.flight_id) return payload class FileFdrWriter: """Single-writer C13 FDR component. Constructor binds the writer to a specific ``flight_root`` and a ``flight_id``; the same UUID MUST be passed in via the ``FlightHeader`` to ``open_flight`` (AZ-292 AC-5). """ def __init__( self, flight_root: Path, flight_id: UUID, config: FdrWriterConfig, fdr_clients: Sequence[FdrClient], gcs_alert: Callable[[str], None], *, on_rotation: Callable[[FileFdrWriter, int], None] | None = None, record_kind_policy: RecordKindPolicy | None = None, drain_sleep_s: float = _DEFAULT_DRAIN_SLEEP_S, clock: Clock | None = None, ) -> None: self._flight_root = Path(flight_root) self._flight_id = flight_id self._config = config self._fdr_clients = tuple(fdr_clients) self._gcs_alert = gcs_alert self._on_rotation = on_rotation self._record_kind_policy = record_kind_policy self._drain_sleep_s = drain_sleep_s self._clock: Clock = clock if clock is not None else WallClock() # Filesystem state. self._flight_dir: Path = self._flight_root / str(flight_id) self._lock_path: Path = self._flight_root / ".fdr.lock" self._lock_fd: int | None = None # Segment state. self._segment_index: int = 0 self._segment_fd: int | None = None self._segment_bytes: int = 0 self._is_rolling: bool = False # Lifecycle state. self._started = False self._stopped = False self._opened = False self._closed = False self._stop_event = threading.Event() self._thread: threading.Thread | None = None self._exit_code_thrown: BaseException | None = None self._last_failure_log_t: float = 0.0 self._is_degraded = False self._gcs_alerted_for_write_failure = False # Counters (writer-thread is the sole mutator). self._records_written = 0 self._records_dropped_overrun = 0 self._bytes_written = 0 self._rollover_count = 0 self._stored_footer: FlightFooter | None = None self._log = get_logger("c13_fdr.writer") # ------------------------------------------------------------------ # Public read-only introspection (AZ-291). def current_segment_path(self) -> Path: return self._segment_path(self._segment_index) def current_segment_bytes(self) -> int: return self._segment_bytes def segments_written(self) -> int: # Number of CLOSED segments. The currently-open one isn't counted # until it rotates or close_flight closes it. return self._segment_index def is_rolling(self) -> bool: return self._is_rolling def is_degraded(self) -> bool: return self._is_degraded def current_size_bytes(self) -> int: return self._bytes_written @property def flight_id(self) -> UUID: return self._flight_id @property def flight_dir(self) -> Path: return self._flight_dir @property def rollover_count(self) -> int: return self._rollover_count @property def records_dropped_overrun(self) -> int: return self._records_dropped_overrun def list_closed_segments(self) -> list[tuple[int, Path]]: """Return ``(segment_index, path)`` for every CLOSED segment on disk. Used by the cap policy (AZ-293) to decide which segment to drop. The currently-open segment (``self._segment_index``) is excluded from the result regardless of whether its file exists. """ result: list[tuple[int, Path]] = [] if not self._flight_dir.exists(): return result for entry in self._flight_dir.iterdir(): if not entry.is_file(): continue name = entry.name if not (name.startswith("segment-") and name.endswith(".fdr")): continue try: index = int(name[len("segment-") : -len(".fdr")]) except ValueError: continue if index == self._segment_index: continue result.append((index, entry)) result.sort(key=lambda kv: kv[0]) return result def increment_rollover_count_for_cap(self) -> None: """Allow the cap policy (AZ-293) to attribute a cap-driven drop to ``rollover_count`` so the footer's totals match. """ self._rollover_count += 1 # ------------------------------------------------------------------ # Lifecycle: start / stop / open_flight / close_flight. def start(self) -> None: if self._started: raise FdrWriterError("FileFdrWriter.start called twice") self._started = True self._flight_root.mkdir(parents=True, exist_ok=True) self._acquire_filelock() try: self._flight_dir.mkdir(parents=True, exist_ok=True) self._open_segment(self._segment_index) except Exception: self._release_filelock() self._started = False raise self._thread = threading.Thread(target=self._writer_loop, name="c13.writer", daemon=True) self._thread.start() self._log.info("c13.writer.start", extra={"kv": {"flight_id": str(self._flight_id)}}) def stop(self) -> None: if not self._started: return if self._stopped: return self._stopped = True self._stop_event.set() if self._thread is not None: self._thread.join(timeout=5.0) # Best-effort drain after thread exit (AC-6). self._drain_all(final=True) self._close_segment(fsync=True) self._release_filelock() self._log.info( "c13.writer.stop", extra={ "kv": { "records_written": self._records_written, "rollover_count": self._rollover_count, } }, ) def open_flight(self, header: FlightHeader) -> None: if not self._started: raise FdrWriterError("open_flight called before start()") if self._opened: raise FdrWriterError("open_flight called twice") if header.flight_id != self._flight_id: self._fail_open( f"FlightHeader.flight_id ({header.flight_id}) does not match " f"writer's flight_id ({self._flight_id})" ) record = FdrRecord( schema_version=1, ts=_iso_now(), producer_id=OVERRUN_PRODUCER_ID, kind=_FLIGHT_HEADER_KIND, payload=_flight_header_to_payload(header), ) try: self._append_record(record) except OSError as exc: self._fail_open(f"failed to write flight_header: {exc}", cause=exc) self._opened = True self._log.info( "fdr.flight_open", extra={"kv": {"flight_id": str(self._flight_id)}}, ) def close_flight(self) -> FlightFooter: if not self._opened: raise FdrCloseWithoutOpenError( "close_flight called without a prior successful open_flight" ) if self._closed: if self._stored_footer is not None: return self._stored_footer raise FdrAlreadyClosedError( "close_flight called twice and no stored footer is available" ) # Drain pending producer records BEFORE emitting the footer so the # counters reflect the entire flight. self._stop_event.set() if self._thread is not None: self._thread.join(timeout=5.0) self._drain_all(final=True) # Build a footer whose ``bytes_written`` payload includes the # footer's own framed size. Because ``bytes_written`` is itself # an integer field on the wire, growing it can change the # serialised length by 1 byte at decimal-power boundaries; we # iterate until the value is stable. Practically this converges # in one or two passes. ts = _iso_now() mono_ns = self._clock.monotonic_ns() records_written_now = self._records_written + 1 # +1 for the footer itself bytes_estimate = self._bytes_written footer: FlightFooter | None = None footer_record: FdrRecord | None = None for _ in range(8): footer = FlightFooter( flight_id=self._flight_id, flight_ended_at_iso=ts, flight_ended_at_monotonic_ns=mono_ns, records_written=records_written_now, records_dropped_overrun=self._records_dropped_overrun, bytes_written=bytes_estimate, rollover_count=self._rollover_count, clean_shutdown=True, ) footer_record = FdrRecord( schema_version=1, ts=footer.flight_ended_at_iso, producer_id=OVERRUN_PRODUCER_ID, kind=_FLIGHT_FOOTER_KIND, payload=_flight_footer_to_payload(footer), ) body = serialise(footer_record) framed_size = _LENGTH_PREFIX.size + len(body) next_estimate = self._bytes_written + framed_size if next_estimate == bytes_estimate: break bytes_estimate = next_estimate assert footer is not None and footer_record is not None try: self._append_record(footer_record) except OSError as exc: self._handle_write_failure(exc) self._close_segment(fsync=True) self._release_filelock() self._closed = True self._stopped = True self._stored_footer = footer self._log.info( "fdr.flight_close", extra={ "kv": { "records_written": footer.records_written, "records_dropped_overrun": footer.records_dropped_overrun, "bytes_written": footer.bytes_written, "rollover_count": footer.rollover_count, "clean_shutdown": True, } }, ) return footer # ------------------------------------------------------------------ # Writer thread loop. def _writer_loop(self) -> None: try: while not self._stop_event.is_set(): drained = self._drain_all(final=False) if drained == 0: self._stop_event.wait(timeout=self._drain_sleep_s) except BaseException as exc: self._exit_code_thrown = exc self._handle_write_failure(exc if isinstance(exc, OSError) else OSError(str(exc))) def _drain_all(self, *, final: bool) -> int: drained = 0 for client in self._fdr_clients: drained += self._drain_one(client, final=final) return drained def _drain_one(self, client: FdrClient, *, final: bool) -> int: batch = client.drain(max_records=self._config.batch_size) for record in batch: self._observe_overrun_record(record) if self._record_kind_policy is not None: decision = self._record_kind_policy.gate_for_writer(record) if decision is GateDecision.DROP: continue try: self._append_record(record) except OSError as exc: self._handle_write_failure(exc) # Continue dequeuing producer buffers so they don't grow # unboundedly even in degraded mode (AC-5 part d). continue self._emit_pending_policy_overrun() return len(batch) def _emit_pending_policy_overrun(self) -> None: if self._record_kind_policy is None: return overrun = self._record_kind_policy.drain_pending_overrun() if overrun is None: return self._observe_overrun_record(overrun) try: self._append_record(overrun) except OSError as exc: self._handle_write_failure(exc) def _observe_overrun_record(self, record: FdrRecord) -> None: if record.kind != OVERRUN_KIND: return dropped = record.payload.get("dropped_count", 0) if isinstance(dropped, int) and dropped > 0: self._records_dropped_overrun += dropped # ------------------------------------------------------------------ # Segment file lifecycle. def _segment_path(self, index: int) -> Path: return self._flight_dir / f"segment-{index:04d}.fdr" def _open_segment(self, index: int) -> None: path = self._segment_path(index) flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND try: self._segment_fd = os.open(path, flags, 0o644) except OSError as exc: raise FdrOpenError(f"failed to open segment {index} at {path}: {exc}") from exc self._segment_bytes = 0 def _close_segment(self, *, fsync: bool) -> None: if self._segment_fd is None: return try: if fsync: os.fsync(self._segment_fd) except OSError: # Best-effort fsync on close; the failure is already accounted # for upstream (degraded mode). Continue with close. pass try: os.close(self._segment_fd) finally: self._segment_fd = None def _rotate_segment(self) -> None: """Close the current segment and atomically open the next one.""" self._is_rolling = True try: previous = self._segment_index self._close_segment(fsync=True) self._segment_index += 1 self._open_segment(self._segment_index) self._rollover_count += 1 self._log.info( "c13.writer.segment_rotated", extra={"kv": {"previous": previous, "current": self._segment_index}}, ) if self._on_rotation is not None: # The cap-policy hook runs synchronously after each rotation # (AZ-293 Risk 3). Exceptions are caught to keep the writer # alive — the policy is responsible for its own error # surfacing. try: self._on_rotation(self, previous) except Exception as exc: self._log.error( "c13.writer.on_rotation_failed", extra={"kv": {"error": repr(exc)}}, ) finally: self._is_rolling = False # ------------------------------------------------------------------ # Record append + length framing. def _append_record(self, record: FdrRecord) -> None: if self._segment_fd is None: raise FdrWriterError("append_record called with no open segment") body = serialise(record) frame = _LENGTH_PREFIX.pack(len(body)) + body # Per-segment cap check happens BEFORE write so the rotation # threshold matches AC-2's "≤ cap + one record overshoot" # interpretation: a record that would push past the cap goes into # the NEXT segment. cap = self._config.segment_size_bytes if cap > 0 and self._segment_bytes > 0 and self._segment_bytes + len(frame) > cap: self._rotate_segment() os.write(self._segment_fd, frame) self._segment_bytes += len(frame) self._bytes_written += len(frame) self._records_written += 1 if self._config.debug_log_per_record: self._log.debug("c13.writer.append", extra={"kv": {"kind": record.kind}}) # ------------------------------------------------------------------ # Cross-process filelock. def _acquire_filelock(self) -> None: self._flight_root.mkdir(parents=True, exist_ok=True) self._lock_fd = os.open(self._lock_path, os.O_RDWR | os.O_CREAT, 0o644) try: fcntl.flock(self._lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except OSError as exc: os.close(self._lock_fd) self._lock_fd = None raise FdrConcurrentWriterError( f"another writer holds {self._lock_path}: {exc}" ) from exc def _release_filelock(self) -> None: if self._lock_fd is None: return try: fcntl.flock(self._lock_fd, fcntl.LOCK_UN) finally: os.close(self._lock_fd) self._lock_fd = None # ------------------------------------------------------------------ # Failure handling. def _fail_open(self, message: str, *, cause: BaseException | None = None) -> None: # An open_flight failure rolls back: close the segment, unlink it # (it can only contain a partially-written header), release the # filelock so the next attempt isn't permanently stuck. seg_path = self._segment_path(self._segment_index) self._close_segment(fsync=False) try: if seg_path.exists() and seg_path.stat().st_size == 0: seg_path.unlink() except OSError: # Rollback unlink is best-effort: a zero-byte stray segment is # harmless (the next open_flight scans + skips empty segments), # so any unlink failure here MUST NOT mask the underlying # FdrOpenError that this _fail_open call is about to raise. pass self._release_filelock() self._started = False if cause is None: raise FdrOpenError(message) raise FdrOpenError(message) from cause def _handle_write_failure(self, exc: BaseException) -> None: now = time.monotonic() if now - self._last_failure_log_t >= _LOG_FAILURE_RATE_LIMIT_S: self._last_failure_log_t = now errno_value = getattr(exc, "errno", None) self._log.error( "fdr.write_failure", extra={"kv": {"errno": errno_value, "error": repr(exc)}}, ) if not self._gcs_alerted_for_write_failure: self._gcs_alerted_for_write_failure = True try: self._gcs_alert( "FDR write failure — companion in degraded mode " f"(errno={getattr(exc, 'errno', errno.EIO)})" ) except Exception: # GCS alert failure is logged but does not unwind further. self._log.error( "fdr.gcs_alert_failed", extra={"kv": {"on": "write_failure"}}, ) self._is_degraded = True