# Replay — ReplaySink Protocol + JsonlReplaySink **Task**: AZ-400_replay_jsonl_sink **Name**: `ReplaySink` Protocol + `JsonlReplaySink` impl **Description**: Define the `ReplaySink` Protocol (PEP 544 `@runtime_checkable`) at `src/gps_denied_onboard/components/c8_fc_adapter/replay_sink.py` (per `module-layout.md` placement; gated `BUILD_REPLAY_SINK_JSONL`). Implement `JsonlReplaySink`: open a writable file at `output_path`; `emit(EstimatorOutput)` writes exactly one JSON object per line via `orjson.dumps(dataclasses.asdict(output)) + b"\n"` (Invariant 7); `close()` flushes + fsyncs the file. Validate `output_path`'s parent directory exists at construction; raise `ReplaySinkError` if not. Bounded-write pre-allocation: open the file with `buffering=0` (unbuffered) so each `emit` flushes immediately — but the explicit fsync on `close()` is the durability guarantee. Frozen-DTO serialisation: `EstimatorOutput.covariance_6x6` (numpy array) → flat list of 36 floats per line; `EstimatorOutput.source_label` (enum) → string name; `EstimatorOutput.captured_at` (int monotonic_ns) → int. **Complexity**: 3 points **Dependencies**: AZ-263, AZ-269, AZ-270, AZ-381 (`EstimatorOutput` DTO), AZ-266; AZ-272 (FDR for sink-open/close events) **Component**: c8_fc_adapter (epic AZ-265 / E-DEMO-REPLAY) — sink lives in `c8_fc_adapter/replay_sink.py` per `module-layout.md` **Tracker**: AZ-400 **Epic**: AZ-265 (E-DEMO-REPLAY) ### Document Dependencies - `_docs/02_document/contracts/replay/replay_protocol.md` — `ReplaySink` Protocol; Invariant 7. - `_docs/02_document/contracts/c5_state/state_estimator_protocol.md` — `EstimatorOutput` DTO shape. - `_docs/02_document/module-layout.md` — sink placement under `c8_fc_adapter/`. ## Problem Without this task, the replay binary has nowhere to emit `EstimatorOutput` — the live binary emits to the FC via `PymavlinkArdupilotAdapter`, but in replay there is no FC. The `JsonlReplaySink` produces the JSONL file the parent-suite UI demo consumes (one estimate per line) — the deliverable artefact of every replay run. ## Outcome - `src/gps_denied_onboard/components/c8_fc_adapter/replay_sink.py` — `ReplaySink` Protocol + `JsonlReplaySink` impl. - Re-export of `ReplaySink` from `c8_fc_adapter/__init__.py` (already declared in module-layout.md Public API). - `JsonlReplaySink.__init__(output_path: Path, fdr_client: FdrClient)`. - `JsonlReplaySink.emit(EstimatorOutput)` — one orjson-serialised line. - `JsonlReplaySink.close()` — fsync + close. - INFO log on construction: `kind="replay.sink.opened"` with `{output_path}`. - INFO log on close: `kind="replay.sink.closed"` with `{output_path, lines_written}`. - DEBUG log every 1000 emits: `kind="replay.sink.emit_progress"`. - Unit tests: Protocol conformance, one-line-per-emit, JSON valid + matches schema, numpy → flat list serialisation, enum → string, missing parent dir raises, double close idempotent, build-flag gating. ## Scope ### Included - Protocol + impl + factory wiring in `compose_replay` (composition done in the next task; sink construction here exposed via a module-level `create(...)` per project convention). - orjson-based serialisation. - Bounded-write semantics + fsync-on-close. - Frozen DTO serialisation (numpy arrays + enums). - Build-flag gating. - Unit tests. ### Excluded - `compose_replay` integration — owned by next task. - CLI `--output` arg parsing — owned by CLI task. - E2E replay fixture test — owned by E2E task. ## Acceptance Criteria **AC-1: Protocol conformance** — `runtime_checkable` `isinstance(JsonlReplaySink(...), ReplaySink)` returns True. **AC-2: One JSON per emit** — emit 100 `EstimatorOutput` records; assert the output file has exactly 100 lines; each line parses as a valid JSON object via `json.loads`; close + reopen the file to reread. **AC-3: Schema match** — emit a known `EstimatorOutput`; assert the parsed JSON has keys matching `EstimatorOutput.__dataclass_fields__` (full coverage of all fields). **AC-4: numpy → flat list** — emit an `EstimatorOutput` with `covariance_6x6 = np.eye(6)`; assert the parsed JSON's `covariance_6x6` is a list of 36 floats with `[1.0, 0.0, 0.0, ..., 0.0, 1.0]` per row-major flatten. **AC-5: enum → string** — emit with `source_label = SATELLITE_ANCHORED`; assert the parsed JSON's `source_label` is the string `"SATELLITE_ANCHORED"` (NOT the integer enum value). **AC-6: missing parent dir raises** — `JsonlReplaySink(Path("/nonexistent/dir/out.jsonl"))` → `ReplaySinkError("output parent directory does not exist: /nonexistent/dir")`. **AC-7: close fsyncs** — emit 100 records; close; assert the file size on disk matches the in-memory expected size; (best-effort) assert the file's modified timestamp updates on close — a smoke check that fsync ran. **AC-8: double close idempotent** — call `close()` twice; second call no-op'd + DEBUG log `kind="replay.sink.double_close"`. **AC-9: lines_written reported on close** — close after 100 emits; INFO log carries `lines_written=100`. **AC-10: Build-flag gating** — `BUILD_REPLAY_SINK_JSONL=OFF` → constructing `JsonlReplaySink` raises `ReplaySinkConfigError("BUILD_REPLAY_SINK_JSONL is OFF...")`. ## Non-Functional Requirements - `emit` p99 ≤ 1 ms (orjson is microsecond-class; file write dominates). - `close()` p99 ≤ 50 ms (fsync round-trip). - Memory: bounded at write-buffer size (no in-memory record retention). ## Constraints - orjson is the chosen serialiser (faster than stdlib `json`; deterministic key ordering per Invariant 10's determinism floor). - File is opened with explicit binary mode (`"wb"`) to avoid platform-specific newline translation. - Parent directory existence is validated at construction (fail-fast). ## Risks & Mitigation - **Risk: orjson serialiser doesn't handle numpy arrays natively** — *Mitigation*: pass `option=orjson.OPT_SERIALIZE_NUMPY`; verified in AC-4. - **Risk: fsync hangs on slow disks** — *Mitigation*: documented; close() is best-effort; the JSONL is written line-by-line so a hung close still produces a partially-readable file. - **Risk: large covariance arrays inflate line size** — *Mitigation*: 36-float list per line is ~720 bytes; for a 60 s run at 5 Hz that's 300 records × ~1 KB = 300 KB total — trivial. ## Runtime Completeness - **Named capability**: offline `EstimatorOutput` sink for replay. - **Production code**: real orjson-based serialiser, real fsync-on-close. - **Allowed external stubs**: test fakes only. - **Unacceptable substitutes**: in-memory list returned at end-of-replay (defeats streaming + UI consumption). ## Contract Implements `_docs/02_document/contracts/replay/replay_protocol.md` — `ReplaySink` Protocol + `JsonlReplaySink`; Invariant 7.