Files
Oleksandr Bezdieniezhnykh fa3742d582 [AZ-399] [AZ-400] C8 TlogReplayFcAdapter + ReplaySink + JsonlReplaySink
Opens E-DEMO-REPLAY (AZ-265): the two C8 strategies that let the
upcoming compose_replay (AZ-401) and gps-denied-replay CLI (AZ-402)
run the production C1-C5 pipeline against a recorded (.tlog, video)
pair without touching live FC I/O.

AZ-400 lands the contract ReplaySink Protocol (emit + close per
replay_protocol.md v1.0.0) and JsonlReplaySink: orjson-serialised
JSONL, fsync-on-close, build-flag gated (BUILD_REPLAY_SINK_JSONL),
double-close idempotent, FDR mirror on open/close. The drifted
AZ-390 stub in interface.py is removed; the canonical Protocol now
lives in replay_sink.py per module-layout.md and is re-exported via
__init__.py. AZ-390 conformance test widened.

AZ-399 lands TlogReplayFcAdapter: full FcAdapter Protocol surface,
build-flag gated (BUILD_TLOG_REPLAY_ADAPTER), pymavlink stream-parse
with bounded pre-scan + fail-fast on missing required messages
(R-DEMO-3), dedicated decode thread feeding the existing AZ-391
SubscriptionBus. Outbound surface raises FcEmitError per Invariant 5;
request_source_set_switch raises SourceSetSwitchNotSupportedError.
Pacing honours Invariant 6 via Clock.sleep_until_ns. time_offset_ms
shifts every emitted received_at per Invariant 8. Non-monotonic
timestamps raise FcOpenError.

Test coverage: 188 c8_fc_adapter tests pass; 1 skipped (AZ-399 AC-1
500 MB tlog RSS bound, deferred to AZ-404 e2e behind RUN_REPLAY_E2E).
Code review: PASS_WITH_WARNINGS — 1 Medium (mapping logic duplicates
AZ-391 live decoder; intentional today, four behavioural deltas
documented), 2 Low.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-14 05:33:20 +03:00

6.6 KiB
Raw Permalink Blame History

Replay — ReplaySink Protocol + JsonlReplaySink

Task: AZ-400_replay_jsonl_sink Name: ReplaySink Protocol + JsonlReplaySink impl Description: Define the ReplaySink Protocol (PEP 544 @runtime_checkable) at src/gps_denied_onboard/components/c8_fc_adapter/replay_sink.py (per module-layout.md placement; gated BUILD_REPLAY_SINK_JSONL). Implement JsonlReplaySink: open a writable file at output_path; emit(EstimatorOutput) writes exactly one JSON object per line via orjson.dumps(dataclasses.asdict(output)) + b"\n" (Invariant 7); close() flushes + fsyncs the file. Validate output_path's parent directory exists at construction; raise ReplaySinkError if not. Bounded-write pre-allocation: open the file with buffering=0 (unbuffered) so each emit flushes immediately — but the explicit fsync on close() is the durability guarantee. Frozen-DTO serialisation: EstimatorOutput.covariance_6x6 (numpy array) → flat list of 36 floats per line; EstimatorOutput.source_label (enum) → string name; EstimatorOutput.captured_at (int monotonic_ns) → int. Complexity: 3 points Dependencies: AZ-263, AZ-269, AZ-270, AZ-381 (EstimatorOutput DTO), AZ-266; AZ-272 (FDR for sink-open/close events) Component: c8_fc_adapter (epic AZ-265 / E-DEMO-REPLAY) — sink lives in c8_fc_adapter/replay_sink.py per module-layout.md Tracker: AZ-400 Epic: AZ-265 (E-DEMO-REPLAY)

Document Dependencies

  • _docs/02_document/contracts/replay/replay_protocol.mdReplaySink Protocol; Invariant 7.
  • _docs/02_document/contracts/c5_state/state_estimator_protocol.mdEstimatorOutput DTO shape.
  • _docs/02_document/module-layout.md — sink placement under c8_fc_adapter/.

Problem

Without this task, the replay binary has nowhere to emit EstimatorOutput — the live binary emits to the FC via PymavlinkArdupilotAdapter, but in replay there is no FC. The JsonlReplaySink produces the JSONL file the parent-suite UI demo consumes (one estimate per line) — the deliverable artefact of every replay run.

Outcome

  • src/gps_denied_onboard/components/c8_fc_adapter/replay_sink.pyReplaySink Protocol + JsonlReplaySink impl.
  • Re-export of ReplaySink from c8_fc_adapter/__init__.py (already declared in module-layout.md Public API).
  • JsonlReplaySink.__init__(output_path: Path, fdr_client: FdrClient).
  • JsonlReplaySink.emit(EstimatorOutput) — one orjson-serialised line.
  • JsonlReplaySink.close() — fsync + close.
  • INFO log on construction: kind="replay.sink.opened" with {output_path}.
  • INFO log on close: kind="replay.sink.closed" with {output_path, lines_written}.
  • DEBUG log every 1000 emits: kind="replay.sink.emit_progress".
  • Unit tests: Protocol conformance, one-line-per-emit, JSON valid + matches schema, numpy → flat list serialisation, enum → string, missing parent dir raises, double close idempotent, build-flag gating.

Scope

Included

  • Protocol + impl + factory wiring in compose_replay (composition done in the next task; sink construction here exposed via a module-level create(...) per project convention).
  • orjson-based serialisation.
  • Bounded-write semantics + fsync-on-close.
  • Frozen DTO serialisation (numpy arrays + enums).
  • Build-flag gating.
  • Unit tests.

Excluded

  • compose_replay integration — owned by next task.
  • CLI --output arg parsing — owned by CLI task.
  • E2E replay fixture test — owned by E2E task.

Acceptance Criteria

AC-1: Protocol conformanceruntime_checkable isinstance(JsonlReplaySink(...), ReplaySink) returns True.

AC-2: One JSON per emit — emit 100 EstimatorOutput records; assert the output file has exactly 100 lines; each line parses as a valid JSON object via json.loads; close + reopen the file to reread.

AC-3: Schema match — emit a known EstimatorOutput; assert the parsed JSON has keys matching EstimatorOutput.__dataclass_fields__ (full coverage of all fields).

AC-4: numpy → flat list — emit an EstimatorOutput with covariance_6x6 = np.eye(6); assert the parsed JSON's covariance_6x6 is a list of 36 floats with [1.0, 0.0, 0.0, ..., 0.0, 1.0] per row-major flatten.

AC-5: enum → string — emit with source_label = SATELLITE_ANCHORED; assert the parsed JSON's source_label is the string "SATELLITE_ANCHORED" (NOT the integer enum value).

AC-6: missing parent dir raisesJsonlReplaySink(Path("/nonexistent/dir/out.jsonl"))ReplaySinkError("output parent directory does not exist: /nonexistent/dir").

AC-7: close fsyncs — emit 100 records; close; assert the file size on disk matches the in-memory expected size; (best-effort) assert the file's modified timestamp updates on close — a smoke check that fsync ran.

AC-8: double close idempotent — call close() twice; second call no-op'd + DEBUG log kind="replay.sink.double_close".

AC-9: lines_written reported on close — close after 100 emits; INFO log carries lines_written=100.

AC-10: Build-flag gatingBUILD_REPLAY_SINK_JSONL=OFF → constructing JsonlReplaySink raises ReplaySinkConfigError("BUILD_REPLAY_SINK_JSONL is OFF...").

Non-Functional Requirements

  • emit p99 ≤ 1 ms (orjson is microsecond-class; file write dominates).
  • close() p99 ≤ 50 ms (fsync round-trip).
  • Memory: bounded at write-buffer size (no in-memory record retention).

Constraints

  • orjson is the chosen serialiser (faster than stdlib json; deterministic key ordering per Invariant 10's determinism floor).
  • File is opened with explicit binary mode ("wb") to avoid platform-specific newline translation.
  • Parent directory existence is validated at construction (fail-fast).

Risks & Mitigation

  • Risk: orjson serialiser doesn't handle numpy arrays nativelyMitigation: pass option=orjson.OPT_SERIALIZE_NUMPY; verified in AC-4.
  • Risk: fsync hangs on slow disksMitigation: documented; close() is best-effort; the JSONL is written line-by-line so a hung close still produces a partially-readable file.
  • Risk: large covariance arrays inflate line sizeMitigation: 36-float list per line is ~720 bytes; for a 60 s run at 5 Hz that's 300 records × ~1 KB = 300 KB total — trivial.

Runtime Completeness

  • Named capability: offline EstimatorOutput sink for replay.
  • Production code: real orjson-based serialiser, real fsync-on-close.
  • Allowed external stubs: test fakes only.
  • Unacceptable substitutes: in-memory list returned at end-of-replay (defeats streaming + UI consumption).

Contract

Implements _docs/02_document/contracts/replay/replay_protocol.mdReplaySink Protocol + JsonlReplaySink; Invariant 7.