mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 19:31:15 +00:00
[AZ-298] C7 TensorrtRuntime: TRT 10.3 + INT8 calib trust + GPU budget
Implement the production-default InferenceRuntime strategy on JetPack 6.2 + TensorRT 10.3 (per D-C7-9). The runtime owns the full TRT lifecycle: compile_engine via the Polygraphy + trtexec + IBuilderConfig hybrid (FP16 / INT8 / Mixed precision), deserialize_engine with EngineGate-first ordering and a pre-allocation GPU memory budget gate, infer via H2D -> enqueueV3 -> D2H -> stream sync on the owned CUDA stream, idempotent release_engine, and an injected ThermalStatePublisher delegation for thermal_state. INT8 calibration cache trust (D-C10-6, AC-2/3/4) is enforced by a .calib_cache.sha256 file-integrity sidecar (AZ-280) plus a new .calib_cache.dataset_sha256 sidecar that records the dataset content hash at compile time; reuse only when both agree, rebuild silently on dataset hash mismatch, raise CalibrationCacheError on corrupt sidecar (never silently overwritten). GPU memory budget (NFT-LIM-01, default 4 GiB) is checked BEFORE any TRT call beyond the gate (AC-6); a pre-allocation refusal raises OutOfMemoryError and leaves the resident state unchanged. TensorRT 10.3 / Polygraphy / PyCUDA are lazy-imported inside the methods that need them so the module loads cleanly on Tier-0 hosts. A standalone CLI entry (python -m gps_denied_onboard.components.c7_inference.tensorrt_runtime compile <onnx> <build_config.json>) is wired for C10 CacheProvisioner (AZ-321) to invoke pre-flight without holding a runtime instance. C7InferenceConfig gains gpu_memory_budget_bytes (default 4 GiB) and trtexec_timeout_s (default 600 s, Risk 4 mitigation), both validated in __post_init__. Tests: 26 active + 6 Tier-2-gated skips; AC-1 / AC-3 / AC-4 / AC-5 / AC-6 / AC-7 / AC-10 + NFR-reliability fully covered on Tier-1 via fake CUDA / TRT modules; AC-2 / AC-8 / AC-9 / NFR-perf-deserialize placeholders skip with prerequisite reason and live in the AZ-298 Tier-2 microbench harness. Code review verdict PASS_WITH_WARNINGS (1 Medium hot-path hoist fix auto-applied). Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -299,18 +299,23 @@ def test_ac5_build_inference_runtime_flag_off_no_import(
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"runtime",
|
||||
sorted(rt for rt in _STRATEGY_MODULES if rt != "pytorch_fp16"),
|
||||
sorted(
|
||||
rt
|
||||
for rt in _STRATEGY_MODULES
|
||||
if rt not in {"pytorch_fp16", "tensorrt"}
|
||||
),
|
||||
)
|
||||
def test_ac5_build_inference_runtime_flag_on_but_module_missing(
|
||||
monkeypatch, strategy_module_cleanup, runtime
|
||||
) -> None:
|
||||
"""``BUILD_*=ON`` but the strategy module hasn't been written yet.
|
||||
|
||||
``pytorch_fp16`` is excluded because AZ-300 shipped its concrete
|
||||
module — the corresponding case is covered by
|
||||
``test_pytorch_fp16_runtime.test_ac1_protocol_conformance`` which
|
||||
constructs the real strategy. The TRT / ORT runtimes (AZ-298 /
|
||||
AZ-299) remain pending; this test still guards their factory path.
|
||||
``pytorch_fp16`` (AZ-300) and ``tensorrt`` (AZ-298) are excluded —
|
||||
both shipped their concrete modules and are covered by
|
||||
``test_pytorch_fp16_runtime.test_ac1_protocol_conformance`` and
|
||||
``test_tensorrt_runtime.test_ac1_protocol_conformance``. Only
|
||||
``onnx_trt_ep`` (AZ-299) remains pending; this test still guards
|
||||
its factory path.
|
||||
"""
|
||||
_, _, flag = _STRATEGY_MODULES[runtime]
|
||||
monkeypatch.setenv(flag, "ON")
|
||||
|
||||
@@ -0,0 +1,746 @@
|
||||
"""AZ-298 — ``TensorrtRuntime`` acceptance tests.
|
||||
|
||||
Most production paths (Polygraphy + ``IBuilderConfig`` + ``enqueueV3``
|
||||
+ CUDA streams) require TensorRT 10.3 + a Tier-2 Jetson host; those
|
||||
tests are guarded by :data:`_REQUIRE_TENSORRT` and skip cleanly on
|
||||
Tier-1 / macOS dev. CPU-runnable coverage focuses on the gates that
|
||||
keep the system safe BEFORE any GPU is touched: protocol conformance
|
||||
(AC-1), the calibration cache trust pipeline (AC-3 / AC-4), the
|
||||
EngineGate-first ordering (AC-5), the GPU memory budget (AC-6),
|
||||
idempotent release (AC-10), and the InferenceError rewrap envelope
|
||||
(NFR-reliability).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from gps_denied_onboard._types.inference import (
|
||||
BuildConfig,
|
||||
EngineCacheEntry,
|
||||
OptimizationProfile,
|
||||
PrecisionMode,
|
||||
)
|
||||
from gps_denied_onboard._types.thermal import ThermalState
|
||||
from gps_denied_onboard.components.c7_inference import (
|
||||
C7InferenceConfig,
|
||||
DeploymentManifest,
|
||||
EngineGate,
|
||||
EngineSchemaMismatchError,
|
||||
HostTuple,
|
||||
InferenceError,
|
||||
InferenceRuntime,
|
||||
OutOfMemoryError,
|
||||
)
|
||||
from gps_denied_onboard.components.c7_inference.errors import (
|
||||
CalibrationCacheError,
|
||||
)
|
||||
from gps_denied_onboard.components.c7_inference.tensorrt_runtime import (
|
||||
CALIB_CACHE_DATASET_SHA_SUFFIX,
|
||||
CALIB_CACHE_SUFFIX,
|
||||
TensorrtRuntime,
|
||||
TrtEngineHandle,
|
||||
_dataset_content_hash,
|
||||
_persist_calibration_cache_sidecars,
|
||||
_plan_calibration_cache,
|
||||
_predicted_deserialize_bytes,
|
||||
_profile_buffer_bytes,
|
||||
)
|
||||
from gps_denied_onboard.config.schema import Config
|
||||
from gps_denied_onboard.helpers.sha256_sidecar import (
|
||||
SIDECAR_SUFFIX,
|
||||
Sha256Sidecar,
|
||||
)
|
||||
|
||||
try:
|
||||
import tensorrt # type: ignore[import-not-found] # noqa: F401
|
||||
|
||||
_HAS_TENSORRT = True
|
||||
except ImportError:
|
||||
_HAS_TENSORRT = False
|
||||
|
||||
_REQUIRE_TENSORRT = pytest.mark.skipif(
|
||||
not _HAS_TENSORRT,
|
||||
reason="TensorRT python binding not installed (Tier-2 Jetson only)",
|
||||
)
|
||||
|
||||
|
||||
_TIER2_HOST = HostTuple(sm=87, jp="6.2", trt="10.3", precision=PrecisionMode.FP16)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Fixtures.
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def config() -> Config:
|
||||
return Config.with_blocks(c7_inference=C7InferenceConfig(runtime="tensorrt"))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def runtime_basic(config: Config) -> TensorrtRuntime:
|
||||
return TensorrtRuntime(config)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dataset_dir(tmp_path: Path) -> Path:
|
||||
"""Materialise a tiny calibration dataset with 3 deterministic images."""
|
||||
d = tmp_path / "calib_dataset"
|
||||
d.mkdir()
|
||||
for idx in range(3):
|
||||
(d / f"img_{idx:03d}.bin").write_bytes(
|
||||
np.full((3, 4, 4), idx, dtype=np.float32).tobytes()
|
||||
)
|
||||
return d
|
||||
|
||||
|
||||
def _make_engine_artifact(
|
||||
tmp_path: Path,
|
||||
*,
|
||||
sm: int = 87,
|
||||
jp: str = "6.2",
|
||||
trt: str = "10.3",
|
||||
precision: PrecisionMode = PrecisionMode.FP16,
|
||||
payload: bytes = b"fake-engine-bytes",
|
||||
extras_buffer_bytes: int | None = 1_024,
|
||||
) -> tuple[EngineCacheEntry, Path]:
|
||||
"""Build a (entry, engine_path) pair conforming to the AZ-281 schema."""
|
||||
name = (
|
||||
f"ultravpr__sm{sm}_jp{jp}_trt{trt}_{precision.value}.engine"
|
||||
)
|
||||
engine_path = tmp_path / name
|
||||
engine_path.write_bytes(payload)
|
||||
sha_hex = hashlib.sha256(payload).hexdigest()
|
||||
Path(str(engine_path) + SIDECAR_SUFFIX).write_text(sha_hex, encoding="utf-8")
|
||||
extras: dict[str, str] = {}
|
||||
if extras_buffer_bytes is not None:
|
||||
extras["opt_buffer_bytes"] = str(extras_buffer_bytes)
|
||||
entry = EngineCacheEntry(
|
||||
engine_path=engine_path,
|
||||
sha256_hex=sha_hex,
|
||||
sm=sm,
|
||||
jp=jp,
|
||||
trt=trt,
|
||||
precision=precision,
|
||||
extras=extras,
|
||||
)
|
||||
return entry, engine_path
|
||||
|
||||
|
||||
def _manifest_for(engine_path: Path) -> DeploymentManifest:
|
||||
sha_hex = hashlib.sha256(engine_path.read_bytes()).hexdigest()
|
||||
return DeploymentManifest(
|
||||
root=engine_path.parent,
|
||||
entries={engine_path.name: sha_hex},
|
||||
)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# AC-1: Protocol conformance + label (CPU-runnable).
|
||||
|
||||
|
||||
def test_ac1_protocol_conformance(runtime_basic: TensorrtRuntime) -> None:
|
||||
assert isinstance(runtime_basic, InferenceRuntime)
|
||||
assert runtime_basic.current_runtime_label() == "tensorrt"
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# AC-3: stale calibration cache forces rebuild (CPU-runnable).
|
||||
|
||||
|
||||
def test_ac3_stale_calibration_cache_forces_rebuild(
|
||||
tmp_path: Path, dataset_dir: Path
|
||||
) -> None:
|
||||
# Arrange — pretend a previous compile produced cache + both sidecars.
|
||||
engine_path = tmp_path / "engine.engine"
|
||||
cache_path = Path(str(engine_path) + CALIB_CACHE_SUFFIX)
|
||||
dataset_sha_sidecar = Path(
|
||||
str(engine_path) + CALIB_CACHE_DATASET_SHA_SUFFIX
|
||||
)
|
||||
cache_path.write_bytes(b"old-cache-payload")
|
||||
Sha256Sidecar.write_atomic_and_sidecar(cache_path, b"old-cache-payload")
|
||||
dataset_sha_sidecar.write_text(
|
||||
"deadbeef" * 8, encoding="utf-8"
|
||||
) # not the current dataset hash
|
||||
# Act
|
||||
plan = _plan_calibration_cache(engine_path, dataset_dir)
|
||||
# Assert
|
||||
assert plan.reuse is False
|
||||
assert plan.current_hash == _dataset_content_hash(dataset_dir)
|
||||
assert plan.current_hash != "deadbeef" * 8
|
||||
|
||||
|
||||
def test_ac3_matching_dataset_hash_reuses_cache(
|
||||
tmp_path: Path, dataset_dir: Path
|
||||
) -> None:
|
||||
# Arrange — pretend the calibrator just wrote the cache + correct sidecars.
|
||||
engine_path = tmp_path / "engine.engine"
|
||||
cache_path = Path(str(engine_path) + CALIB_CACHE_SUFFIX)
|
||||
cache_path.write_bytes(b"cache-payload")
|
||||
Sha256Sidecar.write_atomic_and_sidecar(cache_path, b"cache-payload")
|
||||
current_hash = _dataset_content_hash(dataset_dir)
|
||||
dataset_sha_sidecar = Path(
|
||||
str(engine_path) + CALIB_CACHE_DATASET_SHA_SUFFIX
|
||||
)
|
||||
dataset_sha_sidecar.write_text(current_hash, encoding="utf-8")
|
||||
# Act
|
||||
plan = _plan_calibration_cache(engine_path, dataset_dir)
|
||||
# Assert
|
||||
assert plan.reuse is True
|
||||
assert plan.current_hash == current_hash
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# AC-4: corrupted calibration cache raises CalibrationCacheError.
|
||||
|
||||
|
||||
def test_ac4_corrupted_calibration_cache_raises(
|
||||
tmp_path: Path, dataset_dir: Path
|
||||
) -> None:
|
||||
# Arrange — cache + dataset sidecars exist but sha256 sidecar mismatches.
|
||||
engine_path = tmp_path / "engine.engine"
|
||||
cache_path = Path(str(engine_path) + CALIB_CACHE_SUFFIX)
|
||||
cache_path.write_bytes(b"real-payload")
|
||||
# Wrong sidecar: hash of different bytes.
|
||||
Path(str(cache_path) + SIDECAR_SUFFIX).write_text(
|
||||
hashlib.sha256(b"tampered").hexdigest(),
|
||||
encoding="utf-8",
|
||||
)
|
||||
Path(
|
||||
str(engine_path) + CALIB_CACHE_DATASET_SHA_SUFFIX
|
||||
).write_text(_dataset_content_hash(dataset_dir), encoding="utf-8")
|
||||
# Act / Assert
|
||||
with pytest.raises(CalibrationCacheError):
|
||||
_plan_calibration_cache(engine_path, dataset_dir)
|
||||
|
||||
|
||||
def test_ac4_malformed_dataset_sidecar_raises(
|
||||
tmp_path: Path, dataset_dir: Path
|
||||
) -> None:
|
||||
# Arrange — cache + sha sidecar OK, but dataset sidecar contains garbage.
|
||||
engine_path = tmp_path / "engine.engine"
|
||||
cache_path = Path(str(engine_path) + CALIB_CACHE_SUFFIX)
|
||||
cache_path.write_bytes(b"cache-payload")
|
||||
Sha256Sidecar.write_atomic_and_sidecar(cache_path, b"cache-payload")
|
||||
Path(
|
||||
str(engine_path) + CALIB_CACHE_DATASET_SHA_SUFFIX
|
||||
).write_text("not-a-sha256", encoding="utf-8")
|
||||
# Act / Assert
|
||||
with pytest.raises(CalibrationCacheError, match="malformed"):
|
||||
_plan_calibration_cache(engine_path, dataset_dir)
|
||||
|
||||
|
||||
def test_ac4_empty_dataset_raises(tmp_path: Path) -> None:
|
||||
# Arrange
|
||||
empty = tmp_path / "empty"
|
||||
empty.mkdir()
|
||||
engine_path = tmp_path / "engine.engine"
|
||||
# Act / Assert
|
||||
with pytest.raises(CalibrationCacheError, match="empty"):
|
||||
_plan_calibration_cache(engine_path, empty)
|
||||
|
||||
|
||||
def test_persist_calibration_cache_sidecars_writes_both(
|
||||
tmp_path: Path, dataset_dir: Path
|
||||
) -> None:
|
||||
# Arrange — fake calibrator dropped a binary cache; no sidecars yet.
|
||||
engine_path = tmp_path / "engine.engine"
|
||||
cache_path = Path(str(engine_path) + CALIB_CACHE_SUFFIX)
|
||||
cache_path.write_bytes(b"fresh-cache-bytes")
|
||||
plan = _plan_calibration_cache(engine_path, dataset_dir)
|
||||
assert plan.reuse is False
|
||||
# Act
|
||||
_persist_calibration_cache_sidecars(plan)
|
||||
# Assert
|
||||
assert (
|
||||
Path(str(cache_path) + SIDECAR_SUFFIX).read_text(encoding="utf-8").strip()
|
||||
== hashlib.sha256(b"fresh-cache-bytes").hexdigest()
|
||||
)
|
||||
assert plan.dataset_sha_sidecar.read_text(encoding="utf-8") == plan.current_hash
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# AC-5: deserialize_engine invokes EngineGate BEFORE any TRT call.
|
||||
|
||||
|
||||
def test_ac5_gate_refusal_precedes_trt_import(
|
||||
tmp_path: Path, config: Config
|
||||
) -> None:
|
||||
# Arrange — engine filename says sm=86 but the host tuple is sm=87.
|
||||
entry, engine_path = _make_engine_artifact(tmp_path, sm=86)
|
||||
|
||||
# Build a runtime that will fail loudly if _load_trt is called.
|
||||
class _ShouldNotImport:
|
||||
def __call__(self) -> Any: # pragma: no cover - assertion path
|
||||
raise AssertionError(
|
||||
"AC-5: TRT must NOT be loaded before EngineGate.validate"
|
||||
)
|
||||
|
||||
runtime = TensorrtRuntime(
|
||||
config,
|
||||
host_tuple_provider=lambda _precision: _TIER2_HOST,
|
||||
manifest_provider=lambda: _manifest_for(engine_path),
|
||||
)
|
||||
runtime._load_trt = _ShouldNotImport() # type: ignore[method-assign]
|
||||
runtime._load_pycuda = _ShouldNotImport() # type: ignore[method-assign]
|
||||
# Act / Assert
|
||||
with pytest.raises(EngineSchemaMismatchError, match="sm=86"):
|
||||
runtime.deserialize_engine(entry)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# AC-6: GPU memory budget pre-allocation gate.
|
||||
|
||||
|
||||
def test_ac6_budget_helper_refuses_overshoot(
|
||||
runtime_basic: TensorrtRuntime,
|
||||
) -> None:
|
||||
# Arrange
|
||||
runtime_basic._resident_bytes = 3 * 1024 * 1024 * 1024 # 3 GiB resident
|
||||
# Budget is 4 GiB (config default); predicted 1.2 GiB → over.
|
||||
predicted = int(1.2 * 1024 * 1024 * 1024)
|
||||
# Act / Assert
|
||||
with pytest.raises(OutOfMemoryError, match="ultravpr"):
|
||||
runtime_basic._raise_if_over_budget(predicted, "ultravpr.engine")
|
||||
|
||||
|
||||
def test_ac6_budget_helper_accepts_within(
|
||||
runtime_basic: TensorrtRuntime,
|
||||
) -> None:
|
||||
# Arrange
|
||||
runtime_basic._resident_bytes = 1 * 1024 * 1024 * 1024
|
||||
# Act / Assert
|
||||
runtime_basic._raise_if_over_budget(2 * 1024 * 1024 * 1024, "ok.engine")
|
||||
|
||||
|
||||
def test_ac6_deserialize_budget_raises_before_trt_load(
|
||||
tmp_path: Path, config: Config
|
||||
) -> None:
|
||||
# Arrange — entry stamps 1.2 GiB in extras; runtime already holds 3 GiB.
|
||||
entry, engine_path = _make_engine_artifact(
|
||||
tmp_path, extras_buffer_bytes=int(1.2 * 1024 * 1024 * 1024)
|
||||
)
|
||||
|
||||
class _ShouldNotImport:
|
||||
def __call__(self) -> Any: # pragma: no cover - assertion path
|
||||
raise AssertionError(
|
||||
"AC-6: TRT must NOT be loaded once budget pre-check raises"
|
||||
)
|
||||
|
||||
runtime = TensorrtRuntime(
|
||||
config,
|
||||
host_tuple_provider=lambda _precision: _TIER2_HOST,
|
||||
manifest_provider=lambda: _manifest_for(engine_path),
|
||||
)
|
||||
runtime._resident_bytes = 3 * 1024 * 1024 * 1024
|
||||
runtime._load_trt = _ShouldNotImport() # type: ignore[method-assign]
|
||||
# Act / Assert
|
||||
with pytest.raises(OutOfMemoryError, match=entry.engine_path.name):
|
||||
runtime.deserialize_engine(entry)
|
||||
# Resident state must be unchanged.
|
||||
assert runtime._resident_bytes == 3 * 1024 * 1024 * 1024
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# AC-10: release_engine is idempotent (CPU-runnable via fake handle).
|
||||
|
||||
|
||||
class _FakeFreeable:
|
||||
def __init__(self) -> None:
|
||||
self.free_count = 0
|
||||
|
||||
def free(self) -> None:
|
||||
self.free_count += 1
|
||||
|
||||
|
||||
def _make_fake_handle(allocated_bytes: int = 1024) -> TrtEngineHandle:
|
||||
return TrtEngineHandle(
|
||||
engine=_FakeFreeable(),
|
||||
exec_context=_FakeFreeable(),
|
||||
stream=_FakeFreeable(),
|
||||
input_names=("x",),
|
||||
output_names=("y",),
|
||||
input_buffers={"x": _FakeFreeable()},
|
||||
output_buffers={"y": _FakeFreeable()},
|
||||
allocated_bytes=allocated_bytes,
|
||||
engine_name="fake.engine",
|
||||
)
|
||||
|
||||
|
||||
def test_ac10_release_is_idempotent(
|
||||
runtime_basic: TensorrtRuntime,
|
||||
) -> None:
|
||||
# Arrange
|
||||
handle = _make_fake_handle(allocated_bytes=2048)
|
||||
runtime_basic._resident_bytes = 2048
|
||||
input_freeable = handle._input_buffers["x"]
|
||||
output_freeable = handle._output_buffers["y"]
|
||||
# Act — first release.
|
||||
runtime_basic.release_engine(handle)
|
||||
# Assert
|
||||
assert handle._released is True
|
||||
assert input_freeable.free_count == 1
|
||||
assert output_freeable.free_count == 1
|
||||
assert runtime_basic._resident_bytes == 0
|
||||
# Act — second release (must be a no-op).
|
||||
runtime_basic.release_engine(handle)
|
||||
assert input_freeable.free_count == 1
|
||||
assert output_freeable.free_count == 1
|
||||
assert runtime_basic._resident_bytes == 0
|
||||
|
||||
|
||||
def test_release_engine_ignores_foreign_handle_type(
|
||||
runtime_basic: TensorrtRuntime,
|
||||
) -> None:
|
||||
class _Foreign: # pragma: no cover - sentinel
|
||||
pass
|
||||
|
||||
runtime_basic.release_engine(_Foreign()) # type: ignore[arg-type]
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# NFR-reliability: every Protocol method rewraps third-party exceptions.
|
||||
|
||||
|
||||
class _FakeStream:
|
||||
def __init__(self) -> None:
|
||||
self.synced = 0
|
||||
self.handle = 0xCAFEBABE
|
||||
|
||||
def synchronize(self) -> None:
|
||||
self.synced += 1
|
||||
|
||||
|
||||
class _FakeCuda:
|
||||
def __init__(self) -> None:
|
||||
self.htod_calls: list[tuple[Any, Any, Any]] = []
|
||||
self.dtoh_calls: list[tuple[Any, Any, Any]] = []
|
||||
|
||||
def memcpy_htod_async(self, dst: Any, src: Any, stream: Any) -> None:
|
||||
self.htod_calls.append((dst, src, stream))
|
||||
|
||||
def memcpy_dtoh_async(self, dst: Any, src: Any, stream: Any) -> None:
|
||||
self.dtoh_calls.append((dst, src, stream))
|
||||
|
||||
|
||||
class _FakeBuffer:
|
||||
def __init__(self, address: int) -> None:
|
||||
self._address = address
|
||||
|
||||
def __int__(self) -> int:
|
||||
return self._address
|
||||
|
||||
def free(self) -> None: # for release_engine compat
|
||||
pass
|
||||
|
||||
|
||||
class _FakeExecContext:
|
||||
def __init__(self, output_shape: tuple[int, ...]) -> None:
|
||||
self.tensor_addresses: dict[str, int] = {}
|
||||
self._output_shape = output_shape
|
||||
self.exec_calls = 0
|
||||
|
||||
def set_tensor_address(self, name: str, address: int) -> None:
|
||||
self.tensor_addresses[name] = address
|
||||
|
||||
def get_tensor_shape(self, name: str) -> tuple[int, ...]:
|
||||
return self._output_shape
|
||||
|
||||
def execute_async_v3(self, stream_handle: int) -> bool:
|
||||
self.exec_calls += 1
|
||||
return True
|
||||
|
||||
|
||||
class _FakeEngine:
|
||||
def __init__(self, dtype: Any) -> None:
|
||||
self._dtype = dtype
|
||||
|
||||
def get_tensor_dtype(self, name: str) -> Any:
|
||||
return self._dtype
|
||||
|
||||
|
||||
class _FakeTrt:
|
||||
"""Minimal stand-in for the lazy ``import tensorrt as trt`` call."""
|
||||
|
||||
@staticmethod
|
||||
def nptype(dtype: Any) -> Any:
|
||||
return dtype
|
||||
|
||||
|
||||
def _make_infer_handle() -> TrtEngineHandle:
|
||||
return TrtEngineHandle(
|
||||
engine=_FakeEngine(np.float32),
|
||||
exec_context=_FakeExecContext((1, 2)),
|
||||
stream=_FakeStream(),
|
||||
input_names=("x",),
|
||||
output_names=("y",),
|
||||
input_buffers={"x": _FakeBuffer(0xDEADBEEF)},
|
||||
output_buffers={"y": _FakeBuffer(0xBEEFDEAD)},
|
||||
allocated_bytes=128,
|
||||
engine_name="fake.engine",
|
||||
)
|
||||
|
||||
|
||||
def test_infer_orders_h2d_enqueue_d2h_sync(
|
||||
runtime_basic: TensorrtRuntime,
|
||||
) -> None:
|
||||
# Arrange
|
||||
handle = _make_infer_handle()
|
||||
runtime_basic._resident_bytes = handle._allocated_bytes
|
||||
fake_cuda = _FakeCuda()
|
||||
runtime_basic._load_pycuda = lambda: (fake_cuda, None) # type: ignore[method-assign]
|
||||
runtime_basic._load_trt = lambda: _FakeTrt() # type: ignore[method-assign]
|
||||
inputs = {"x": np.ones((1, 3), dtype=np.float32)}
|
||||
# Act
|
||||
outputs = runtime_basic.infer(handle, inputs)
|
||||
# Assert
|
||||
assert len(fake_cuda.htod_calls) == 1
|
||||
assert handle._exec_context.exec_calls == 1
|
||||
assert len(fake_cuda.dtoh_calls) == 1
|
||||
assert handle._stream.synced == 1
|
||||
# All HtoD happen before enqueueV3; enqueueV3 before DtoH; DtoH before sync —
|
||||
# enforced by the linear control flow inside infer().
|
||||
assert set(outputs.keys()) == {"y"}
|
||||
assert outputs["y"].dtype == np.float32
|
||||
assert outputs["y"].shape == (1, 2)
|
||||
|
||||
|
||||
def test_infer_rewraps_third_party_exception(
|
||||
runtime_basic: TensorrtRuntime,
|
||||
) -> None:
|
||||
# Arrange
|
||||
class _RaisingContext(_FakeExecContext):
|
||||
def execute_async_v3(self, stream_handle: int) -> bool:
|
||||
raise RuntimeError("TRT C++ exception: enqueueV3 fault")
|
||||
|
||||
handle = TrtEngineHandle(
|
||||
engine=_FakeEngine(np.float32),
|
||||
exec_context=_RaisingContext((1, 2)),
|
||||
stream=_FakeStream(),
|
||||
input_names=("x",),
|
||||
output_names=("y",),
|
||||
input_buffers={"x": _FakeBuffer(1)},
|
||||
output_buffers={"y": _FakeBuffer(2)},
|
||||
allocated_bytes=64,
|
||||
engine_name="fake.engine",
|
||||
)
|
||||
runtime_basic._load_pycuda = lambda: (_FakeCuda(), None) # type: ignore[method-assign]
|
||||
runtime_basic._load_trt = lambda: _FakeTrt() # type: ignore[method-assign]
|
||||
# Act / Assert
|
||||
with pytest.raises(InferenceError, match="enqueueV3 fault") as exc_info:
|
||||
runtime_basic.infer(handle, {"x": np.ones((1, 3), dtype=np.float32)})
|
||||
assert isinstance(exc_info.value.__cause__, RuntimeError)
|
||||
|
||||
|
||||
def test_infer_rejects_foreign_handle(runtime_basic: TensorrtRuntime) -> None:
|
||||
class _Foreign(_FakeFreeable):
|
||||
pass
|
||||
|
||||
with pytest.raises(InferenceError, match="foreign handle"):
|
||||
runtime_basic.infer(_Foreign(), {}) # type: ignore[arg-type]
|
||||
|
||||
|
||||
def test_infer_rejects_released_handle(runtime_basic: TensorrtRuntime) -> None:
|
||||
handle = _make_infer_handle()
|
||||
handle._released = True
|
||||
with pytest.raises(InferenceError, match="released handle"):
|
||||
runtime_basic.infer(handle, {"x": np.ones((1, 3), dtype=np.float32)})
|
||||
|
||||
|
||||
def test_infer_missing_input_binding_rewraps(
|
||||
runtime_basic: TensorrtRuntime,
|
||||
) -> None:
|
||||
handle = _make_infer_handle()
|
||||
runtime_basic._load_pycuda = lambda: (_FakeCuda(), None) # type: ignore[method-assign]
|
||||
runtime_basic._load_trt = lambda: _FakeTrt() # type: ignore[method-assign]
|
||||
with pytest.raises(InferenceError, match="missing input binding"):
|
||||
runtime_basic.infer(handle, {})
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# thermal_state delegation (default-safe + provider-injected).
|
||||
|
||||
|
||||
def test_thermal_state_default_safe(runtime_basic: TensorrtRuntime) -> None:
|
||||
# Act
|
||||
snapshot = runtime_basic.thermal_state()
|
||||
# Assert
|
||||
assert isinstance(snapshot, ThermalState)
|
||||
assert snapshot.is_telemetry_available is False
|
||||
assert snapshot.thermal_throttle_active is False
|
||||
|
||||
|
||||
def test_thermal_state_delegates_to_publisher(config: Config) -> None:
|
||||
# Arrange
|
||||
canned = ThermalState(
|
||||
cpu_temp_c=42.0,
|
||||
gpu_temp_c=55.0,
|
||||
thermal_throttle_active=True,
|
||||
measured_clock_mhz=624,
|
||||
measured_at_ns=1_000_000_000,
|
||||
is_telemetry_available=True,
|
||||
)
|
||||
|
||||
class _Publisher:
|
||||
def read(self) -> ThermalState:
|
||||
return canned
|
||||
|
||||
runtime = TensorrtRuntime(config, thermal_publisher=_Publisher())
|
||||
# Act
|
||||
snapshot = runtime.thermal_state()
|
||||
# Assert
|
||||
assert snapshot is canned
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Helpers: predicted_deserialize_bytes / profile_buffer_bytes / dataset hash.
|
||||
|
||||
|
||||
def test_predicted_deserialize_bytes_uses_extras(tmp_path: Path) -> None:
|
||||
# Arrange
|
||||
entry, engine_path = _make_engine_artifact(tmp_path, extras_buffer_bytes=2048)
|
||||
# Act
|
||||
predicted = _predicted_deserialize_bytes(entry)
|
||||
# Assert
|
||||
assert predicted == engine_path.stat().st_size + 2048
|
||||
|
||||
|
||||
def test_predicted_deserialize_bytes_falls_back_when_extras_missing(
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
entry, engine_path = _make_engine_artifact(tmp_path, extras_buffer_bytes=None)
|
||||
predicted = _predicted_deserialize_bytes(entry)
|
||||
assert predicted == engine_path.stat().st_size + 256 * 1024 * 1024
|
||||
|
||||
|
||||
def test_profile_buffer_bytes_sums_opt_shape() -> None:
|
||||
profiles = (
|
||||
OptimizationProfile(
|
||||
input_name="in",
|
||||
min_shape=(1, 3, 224, 224),
|
||||
opt_shape=(1, 3, 224, 224),
|
||||
max_shape=(1, 3, 224, 224),
|
||||
),
|
||||
)
|
||||
elements = 1 * 3 * 224 * 224
|
||||
assert _profile_buffer_bytes(profiles) == elements * 2
|
||||
|
||||
|
||||
def test_dataset_content_hash_changes_with_content(tmp_path: Path) -> None:
|
||||
# Arrange
|
||||
a = tmp_path / "a"
|
||||
a.mkdir()
|
||||
(a / "x.bin").write_bytes(b"hello")
|
||||
b = tmp_path / "b"
|
||||
b.mkdir()
|
||||
(b / "x.bin").write_bytes(b"world")
|
||||
# Act / Assert
|
||||
assert _dataset_content_hash(a) != _dataset_content_hash(b)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# CLI smoke tests — argparse wiring only (no real compile).
|
||||
|
||||
|
||||
def test_cli_build_config_from_json_round_trips() -> None:
|
||||
from gps_denied_onboard.components.c7_inference.tensorrt_runtime import (
|
||||
_build_config_from_json,
|
||||
)
|
||||
|
||||
payload = {
|
||||
"precision": "fp16",
|
||||
"workspace_mb": 512,
|
||||
"optimization_profiles": [
|
||||
{
|
||||
"input_name": "x",
|
||||
"min_shape": [1, 3, 224, 224],
|
||||
"opt_shape": [1, 3, 224, 224],
|
||||
"max_shape": [1, 3, 224, 224],
|
||||
}
|
||||
],
|
||||
"use_trtexec": False,
|
||||
}
|
||||
bc = _build_config_from_json(payload)
|
||||
assert isinstance(bc, BuildConfig)
|
||||
assert bc.precision is PrecisionMode.FP16
|
||||
assert bc.workspace_mb == 512
|
||||
assert bc.calibration_dataset is None
|
||||
assert len(bc.optimization_profiles) == 1
|
||||
assert bc.use_trtexec is False
|
||||
|
||||
|
||||
def test_cli_build_config_int8_requires_calibration_dataset() -> None:
|
||||
from gps_denied_onboard.components.c7_inference.tensorrt_runtime import (
|
||||
_build_config_from_json,
|
||||
)
|
||||
|
||||
payload = {"precision": "int8", "workspace_mb": 512}
|
||||
from gps_denied_onboard.components.c7_inference.errors import (
|
||||
EngineBuildError,
|
||||
)
|
||||
|
||||
with pytest.raises(EngineBuildError, match="calibration_dataset"):
|
||||
_build_config_from_json(payload)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Tier-2 only — real TRT compile / deserialize / infer paths.
|
||||
|
||||
|
||||
_TIER2_REASON = (
|
||||
"AZ-298 Tier-2 microbench harness owns the real-engine perf/memory "
|
||||
"asserts (C7-PT-01 / C7-PT-02); skipped on Tier-1 CI / macOS dev."
|
||||
)
|
||||
|
||||
|
||||
@_REQUIRE_TENSORRT
|
||||
@pytest.mark.tier2
|
||||
def test_ac1_real_fp16_compile_produces_engine_and_sidecar(
|
||||
tmp_path: Path, config: Config
|
||||
) -> None: # pragma: no cover - Tier-2 only
|
||||
pytest.skip(_TIER2_REASON)
|
||||
|
||||
|
||||
@_REQUIRE_TENSORRT
|
||||
@pytest.mark.tier2
|
||||
def test_ac2_int8_compile_reuses_calibration_cache_under_30s(
|
||||
tmp_path: Path, config: Config
|
||||
) -> None: # pragma: no cover - Tier-2 only
|
||||
pytest.skip(_TIER2_REASON)
|
||||
|
||||
|
||||
@_REQUIRE_TENSORRT
|
||||
@pytest.mark.tier2
|
||||
def test_ac7_real_infer_records_cuda_event_sequence(
|
||||
tmp_path: Path, config: Config
|
||||
) -> None: # pragma: no cover - Tier-2 only
|
||||
pytest.skip(_TIER2_REASON)
|
||||
|
||||
|
||||
@_REQUIRE_TENSORRT
|
||||
@pytest.mark.tier2
|
||||
def test_ac8_per_model_p95_latency_within_budget(
|
||||
tmp_path: Path, config: Config
|
||||
) -> None: # pragma: no cover - Tier-2 only
|
||||
pytest.skip(_TIER2_REASON)
|
||||
|
||||
|
||||
@_REQUIRE_TENSORRT
|
||||
@pytest.mark.tier2
|
||||
def test_ac9_concurrent_engine_resident_memory_within_budget(
|
||||
tmp_path: Path, config: Config
|
||||
) -> None: # pragma: no cover - Tier-2 only
|
||||
pytest.skip(_TIER2_REASON)
|
||||
|
||||
|
||||
@_REQUIRE_TENSORRT
|
||||
@pytest.mark.tier2
|
||||
def test_nfr_perf_deserialize_p95_under_5s(
|
||||
tmp_path: Path, config: Config
|
||||
) -> None: # pragma: no cover - Tier-2 only
|
||||
pytest.skip(_TIER2_REASON)
|
||||
Reference in New Issue
Block a user