[AZ-526] Consolidate _iso_ts_from_clock into helpers/iso_timestamps

Closes cumulative review 46-48 F1 (Medium) + F3 (Low). Adds
iso_ts_from_clock(clock) alongside iso_ts_now() in the Layer-1
helper; migrates four duplicate definitions in c2_vpr (net_vlad,
ultra_vpr, _faiss_bridge) and c12_operator_orchestrator
(operator_reloc_service). Output format flipped +00:00 -> Z to
align with iso_ts_now() and the canonical FDR _TS fixture (FDR
schema test passes unmodified).

18 helper AC tests + 186 sibling tests pass; ruff clean.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-13 23:37:04 +03:00
parent fbeeab60b3
commit 5dfd9a577e
13 changed files with 540 additions and 82 deletions
+5 -1
View File
@@ -22,7 +22,10 @@ from gps_denied_onboard.helpers.imu_preintegrator import (
ImuPreintegrator,
make_imu_preintegrator,
)
from gps_denied_onboard.helpers.iso_timestamps import iso_ts_now
from gps_denied_onboard.helpers.iso_timestamps import (
iso_ts_from_clock,
iso_ts_now,
)
from gps_denied_onboard.helpers.lightglue_runtime import (
LightGlueConcurrentAccessError,
LightGlueRuntime,
@@ -84,6 +87,7 @@ __all__ = [
"adjoint",
"exp_map",
"is_valid_rotation",
"iso_ts_from_clock",
"iso_ts_now",
"log_map",
"make_imu_preintegrator",
@@ -1,27 +1,36 @@
"""UTC ISO-8601 timestamp helper (E-CC-HELPERS / AZ-264 / AZ-508).
"""UTC ISO-8601 timestamp helpers (E-CC-HELPERS / AZ-264 / AZ-508 + AZ-526).
Single Layer-1 source for the wall-clock string used in the FDR record
``ts`` envelope across c6_tile_cache and c7_inference. Consolidates what
used to be a private ``_iso_ts_now`` one-liner repeated per module.
Single Layer-1 source for the FDR record ``ts`` envelope strings across
every component. Two variants:
The output format is locked to RFC 3339 / ISO 8601 UTC with microsecond
precision and a ``Z`` suffix, matching the canonical FDR ``ts`` fixture
in ``tests/unit/test_az272_fdr_record_schema.py`` (``_TS =
"2026-05-11T00:00:00.000000Z"``) and the format produced by the three
existing local helpers this module replaces. Changing the format would
alter FDR record bit-shape and is explicitly out of scope for AZ-508.
- :func:`iso_ts_now` — uses :func:`datetime.now` (wall clock); 6-digit
microsecond precision; for components that don't have a ``Clock``
injected (the c6_tile_cache and c7_inference FDR producers).
- :func:`iso_ts_from_clock` — uses an injected :class:`Clock` Protocol;
9-digit nanosecond precision; for components that already accept a
``Clock`` constructor parameter (the c2_vpr strategies, the FAISS
bridge, the C12 operator-reloc service, and the future C4 / C5 batches).
Producers that need a Clock-injected payload field (e.g.
``age_seconds`` derived from an injected wall-clock for testability)
MUST NOT route through this helper — it is purely metadata about WHEN
the FDR record envelope itself was emitted.
Both produce RFC 3339 UTC with a ``Z`` suffix, matching the canonical
FDR ``ts`` fixture in ``tests/unit/test_az272_fdr_record_schema.py``
(``_TS = "2026-05-11T00:00:00.000000Z"``). Changing the format would
alter FDR record bit-shape and is explicitly out of scope.
Producers that need a Clock-injected payload field (e.g. ``age_seconds``
derived from an injected wall-clock for testability) MUST NOT route
through these helpers — they are purely metadata about WHEN the FDR
record envelope itself was emitted.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import TYPE_CHECKING
__all__ = ["iso_ts_now"]
if TYPE_CHECKING:
from gps_denied_onboard.clock import Clock
__all__ = ["iso_ts_from_clock", "iso_ts_now"]
def iso_ts_now() -> str:
@@ -31,3 +40,17 @@ def iso_ts_now() -> str:
monotonic under a non-decreasing wall clock).
"""
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
def iso_ts_from_clock(clock: Clock) -> str:
"""Return an RFC 3339 UTC timestamp built from ``clock.time_ns()``.
Format: ``YYYY-MM-DDTHH:MM:SS.fffffffffZ`` (9-digit nanosecond
precision, ``Z`` suffix). Use when the timestamp must follow an
injectable :class:`~gps_denied_onboard.clock.Clock` (replay tests,
deterministic fixtures); use :func:`iso_ts_now` otherwise.
"""
ns = int(clock.time_ns())
seconds, fraction_ns = divmod(ns, 1_000_000_000)
dt = datetime.fromtimestamp(seconds, tz=timezone.utc)
return f"{dt.strftime('%Y-%m-%dT%H:%M:%S')}.{fraction_ns:09d}Z"