[AZ-390] [AZ-392] C8 FC/GCS adapter foundation + covariance projector

Adds the C8 foundation:
- FcAdapter / GcsAdapter / ReplaySink Protocols + contract DTOs in
  _types/fc.py (PortConfig, FcKind, FlightState, GpsStatus, Severity,
  TelemetryKind, FcTelemetryFrame, FlightStateSignal, GpsHealth,
  OperatorCommand, Subscription, Imu/Attitude samples).
- Disjoint FcAdapterError / GcsAdapterError trees with
  SourceSetSwitchNotSupportedError <: SourceSetSwitchError per AC-9.
- FcConfig + GcsConfig cross-cutting Config blocks with config-load
  validation (unknown strategy rejected at __post_init__).
- runtime_root/fc_factory.py: build_fc_adapter / build_gcs_adapter
  with BUILD_FC_*/BUILD_GCS_* flag gating + INFO log on load +
  single-writer outbound-thread binding.
- CovarianceProjector (helper, AZ-392): 6x6 -> 3x3 -> 2x2 ->
  sqrt(lambda_max) reduction; AP returns float m, iNav returns int mm
  with uint16 clamp + WARN + FDR record. Non-SPD / NaN / wrong-shape
  raise FcEmitError and emit an FDR ERROR record carrying frame_id.

Contracts:
- composition_root_protocol.md 1.1.0 -> 1.2.0 (added fc/gcs blocks +
  build_fc_adapter / build_gcs_adapter + outbound-thread binding).
- fc_adapter_protocol.md unchanged (this batch implements v1.0.0).

Tests: 410 pass / 2 skip / 0 fail (+53 new tests in batch 8).

AZ-391 (inbound subscription) deferred to batch 9 — pulls YAMSPy as
a new external dependency (iNav MSP2 decode).

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Oleksandr Bezdieniezhnykh
2026-05-11 04:17:59 +03:00
parent e4ecdaf619
commit 362e93c626
22 changed files with 1909 additions and 59 deletions
@@ -0,0 +1,513 @@
"""AZ-390 — FcAdapter + GcsAdapter Protocols + DTOs + factories + composition.
Covers all 10 ACs:
1. Protocol conformance via @runtime_checkable
2. DTOs frozen + slots
3. Enum membership
4. Factory rejects build-flag OFF
5. Factory rejects unknown strategy at config-load
6. Single-writer outbound thread
7. GcsAdapter factory parallel coverage
8. Public API re-exports
9. Error hierarchy catchability
10. INFO log on build
"""
from __future__ import annotations
import dataclasses
import logging
import threading
from collections.abc import Callable
import pytest
from gps_denied_onboard._types.emitted import EmittedExternalPosition
from gps_denied_onboard._types.fc import (
AttitudeSample,
FcKind,
FcTelemetryFrame,
FlightState,
FlightStateSignal,
GpsHealth,
GpsStatus,
ImuTelemetrySample,
OperatorCommand,
PortConfig,
Severity,
Subscription,
TelemetryKind,
)
from gps_denied_onboard._types.pose import EstimatorOutput
from gps_denied_onboard.components.c8_fc_adapter import (
FcAdapter,
GcsAdapter,
ReplaySink,
)
from gps_denied_onboard.components.c8_fc_adapter.errors import (
FcAdapterConfigError,
FcAdapterError,
FcEmitError,
FcOpenError,
GcsAdapterConfigError,
GcsAdapterError,
GcsEmitError,
SigningHandshakeError,
SourceSetSwitchError,
SourceSetSwitchNotSupportedError,
)
from gps_denied_onboard.config import Config, ConfigError, FcConfig, GcsConfig
from gps_denied_onboard.runtime_root.fc_factory import (
OutboundThreadAlreadyBoundError,
bind_outbound_emit_thread,
build_fc_adapter,
build_gcs_adapter,
clear_outbound_thread_binding,
clear_strategy_registries,
register_fc_adapter,
register_gcs_adapter,
)
@pytest.fixture(autouse=True)
def _isolate_factory_state() -> None:
# Arrange — every test starts from a clean registry + thread binding.
clear_strategy_registries()
clear_outbound_thread_binding()
yield
clear_strategy_registries()
clear_outbound_thread_binding()
# ----------------------------------------------------------------------
# AC-1: Protocol conformance
class _FcStub:
def open(self, port: PortConfig, signing_key: bytes | None) -> None: ...
def close(self) -> None: ...
def subscribe_telemetry(self, callback: Callable[[FcTelemetryFrame], None]) -> Subscription:
class _Sub:
def cancel(self) -> None: ...
return _Sub()
def emit_external_position(self, output: EstimatorOutput) -> EmittedExternalPosition:
return EmittedExternalPosition(
fc_kind=FcKind.ARDUPILOT_PLANE,
horiz_accuracy_m=1.0,
source_label=output.source_label,
emitted_at=0,
sequence_number=0,
)
def emit_status_text(self, msg: str, severity: Severity) -> None: ...
def request_source_set_switch(self) -> None: ...
def current_flight_state(self) -> FlightStateSignal:
return FlightStateSignal(
state=FlightState.INIT,
last_valid_gps_hint_wgs84=None,
last_valid_gps_age_ms=None,
captured_at=0,
)
class _GcsStub:
def open(self, port: PortConfig) -> None: ...
def close(self) -> None: ...
def emit_summary(self, output: EstimatorOutput) -> None: ...
def subscribe_operator_commands(
self, callback: Callable[[OperatorCommand], None]
) -> Subscription:
class _Sub:
def cancel(self) -> None: ...
return _Sub()
def emit_status_text(self, msg: str, severity: Severity) -> None: ...
def test_ac1_fc_protocol_conformance() -> None:
# Assert
assert isinstance(_FcStub(), FcAdapter)
def test_ac1_gcs_protocol_conformance() -> None:
# Assert
assert isinstance(_GcsStub(), GcsAdapter)
def test_ac1_replay_sink_protocol_conformance() -> None:
# Arrange
class _Sink:
def write(self, output: EstimatorOutput) -> None: ...
# Assert
assert isinstance(_Sink(), ReplaySink)
def test_ac1_protocol_rejects_missing_method() -> None:
# Arrange
class _Incomplete:
def open(self, port: PortConfig, signing_key: bytes | None) -> None: ...
def close(self) -> None: ...
# missing the other methods
# Assert
assert not isinstance(_Incomplete(), FcAdapter)
# ----------------------------------------------------------------------
# AC-2: DTOs frozen + slots
@pytest.mark.parametrize(
"dto, init_kwargs",
[
(PortConfig, {"device": "/dev/ttyTHS1", "baud": 921600, "fc_kind": FcKind.ARDUPILOT_PLANE}),
(
ImuTelemetrySample,
{"ts_ns": 0, "accel_xyz": (0.0, 0.0, 0.0), "gyro_xyz": (0.0, 0.0, 0.0)},
),
(AttitudeSample, {"ts_ns": 0, "roll_rad": 0.0, "pitch_rad": 0.0, "yaw_rad": 0.0}),
(GpsHealth, {"status": GpsStatus.STABLE, "fix_age_ms": 0, "captured_at": 0}),
(
FlightStateSignal,
{
"state": FlightState.INIT,
"last_valid_gps_hint_wgs84": None,
"last_valid_gps_age_ms": None,
"captured_at": 0,
},
),
(OperatorCommand, {"command": "test", "payload": {}, "received_at": 0}),
(
EmittedExternalPosition,
{
"fc_kind": FcKind.ARDUPILOT_PLANE,
"horiz_accuracy_m": 1.0,
"source_label": "visual_propagated",
"emitted_at": 0,
"sequence_number": 0,
},
),
],
)
def test_ac2_dto_frozen_and_slotted(dto: type, init_kwargs: dict) -> None:
# Arrange
instance = dto(**init_kwargs)
# Assert — frozen
with pytest.raises(dataclasses.FrozenInstanceError):
any_field = next(iter(init_kwargs))
setattr(instance, any_field, init_kwargs[any_field])
# Assert — slots
assert hasattr(dto, "__slots__")
assert len(dto.__slots__) > 0
def test_ac2_fc_telemetry_frame_dto_frozen() -> None:
# Arrange
frame = FcTelemetryFrame(
kind=TelemetryKind.IMU_SAMPLE,
payload=ImuTelemetrySample(ts_ns=0, accel_xyz=(0.0, 0.0, 0.0), gyro_xyz=(0.0, 0.0, 0.0)),
received_at=0,
signed=False,
)
# Assert
with pytest.raises(dataclasses.FrozenInstanceError):
frame.signed = True # type: ignore[misc]
assert hasattr(FcTelemetryFrame, "__slots__")
# ----------------------------------------------------------------------
# AC-3: Enum membership
def test_ac3_fc_kind_has_two_members() -> None:
# Assert
assert {m.name for m in FcKind} == {"ARDUPILOT_PLANE", "INAV"}
def test_ac3_flight_state_has_five_members() -> None:
# Assert
assert {m.name for m in FlightState} == {"INIT", "ARMED", "IN_FLIGHT", "ON_GROUND", "FAILED"}
def test_ac3_gps_status_has_five_members() -> None:
# Assert
assert {m.name for m in GpsStatus} == {
"NO_FIX",
"DEGRADED",
"STABLE",
"STABLE_NON_SPOOFED",
"SPOOFED",
}
def test_ac3_severity_values_mirror_mavlink() -> None:
# Assert
assert Severity.INFO.value == 6
assert Severity.WARNING.value == 4
assert Severity.ERROR.value == 3
def test_ac3_telemetry_kind_has_four_members() -> None:
# Assert
assert {m.name for m in TelemetryKind} == {
"IMU_SAMPLE",
"ATTITUDE",
"GPS_HEALTH",
"MAV_STATE",
}
# ----------------------------------------------------------------------
# AC-4: Factory rejects build-flag OFF
def test_ac4_fc_factory_rejects_build_flag_off(monkeypatch: pytest.MonkeyPatch) -> None:
# Arrange
monkeypatch.setenv("BUILD_FC_ARDUPILOT_PLANE", "OFF")
register_fc_adapter("ardupilot_plane", lambda **_: _FcStub())
config = Config(fc=FcConfig(adapter="ardupilot_plane"))
# Act + Assert
with pytest.raises(FcAdapterConfigError, match=r"BUILD_FC_ARDUPILOT_PLANE is OFF"):
build_fc_adapter(config)
def test_ac4_fc_factory_passes_when_flag_on(monkeypatch: pytest.MonkeyPatch) -> None:
# Arrange
monkeypatch.setenv("BUILD_FC_ARDUPILOT_PLANE", "ON")
register_fc_adapter("ardupilot_plane", lambda **_: _FcStub())
config = Config(fc=FcConfig(adapter="ardupilot_plane"))
# Act
adapter = build_fc_adapter(config)
# Assert
assert isinstance(adapter, FcAdapter)
# ----------------------------------------------------------------------
# AC-5: Factory rejects unknown strategy at config-load
def test_ac5_unknown_fc_strategy_rejected_at_config_load() -> None:
# Act + Assert — happens at construction time, not at build time
with pytest.raises(ConfigError, match=r"not in \['ardupilot_plane', 'inav'\]"):
FcConfig(adapter="garbage_fc")
def test_ac5_unknown_gcs_strategy_rejected_at_config_load() -> None:
# Act + Assert
with pytest.raises(ConfigError, match=r"not in \['qgc_mavlink'\]"):
GcsConfig(adapter="garbage_gcs")
def test_ac5_inav_signing_key_combination_rejected() -> None:
# Act + Assert — iNav with signing key is RESTRICT-COMM-2 violation
with pytest.raises(ConfigError, match=r"RESTRICT-COMM-2"):
FcConfig(adapter="inav", signing_key_source="ephemeral_per_flight")
def test_ac5_unregistered_strategy_rejected_at_build_with_clear_message() -> None:
# Arrange — strategy is in the known set but the binary didn't register a factory
config = Config(fc=FcConfig(adapter="inav", signing_key_source="none"))
# Act + Assert
with pytest.raises(FcAdapterConfigError, match=r"not registered"):
build_fc_adapter(config)
# ----------------------------------------------------------------------
# AC-6: Single-writer outbound thread
def test_ac6_first_bind_returns_thread_ident() -> None:
# Act
bound = bind_outbound_emit_thread()
# Assert
assert bound == threading.get_ident()
def test_ac6_second_bind_from_different_thread_rejected() -> None:
# Arrange
bind_outbound_emit_thread(thread_ident=1)
errors: list[BaseException] = []
def attempt_rebind() -> None:
try:
bind_outbound_emit_thread(thread_ident=2)
except OutboundThreadAlreadyBoundError as exc:
errors.append(exc)
# Act
t = threading.Thread(target=attempt_rebind)
t.start()
t.join()
# Assert
assert len(errors) == 1
assert isinstance(errors[0], RuntimeError)
def test_ac6_rebind_same_thread_idempotent() -> None:
# Arrange
first = bind_outbound_emit_thread(thread_ident=42)
# Act
second = bind_outbound_emit_thread(thread_ident=42)
# Assert — re-binding the SAME thread is idempotent (composition root may run twice in tests)
assert first == second == 42
# ----------------------------------------------------------------------
# AC-7: GcsAdapter factory parallel coverage
def test_ac7_gcs_factory_resolves_known_strategy(monkeypatch: pytest.MonkeyPatch) -> None:
# Arrange
monkeypatch.setenv("BUILD_GCS_QGC_MAVLINK", "ON")
register_gcs_adapter("qgc_mavlink", lambda **_: _GcsStub())
config = Config(gcs=GcsConfig(adapter="qgc_mavlink"))
# Act
adapter = build_gcs_adapter(config)
# Assert
assert isinstance(adapter, GcsAdapter)
def test_ac7_gcs_factory_rejects_flag_off(monkeypatch: pytest.MonkeyPatch) -> None:
# Arrange
monkeypatch.setenv("BUILD_GCS_QGC_MAVLINK", "OFF")
register_gcs_adapter("qgc_mavlink", lambda **_: _GcsStub())
config = Config(gcs=GcsConfig(adapter="qgc_mavlink"))
# Act + Assert
with pytest.raises(GcsAdapterConfigError, match=r"BUILD_GCS_QGC_MAVLINK is OFF"):
build_gcs_adapter(config)
# ----------------------------------------------------------------------
# AC-9: Error hierarchy catchability
def test_ac9_every_fc_error_is_fc_adapter_error() -> None:
# Assert — all FC errors share the base
for err_cls in [
FcOpenError,
FcEmitError,
SigningHandshakeError,
SourceSetSwitchError,
SourceSetSwitchNotSupportedError,
FcAdapterConfigError,
]:
assert issubclass(err_cls, FcAdapterError), err_cls
def test_ac9_source_set_switch_not_supported_is_subclass_of_switch_error() -> None:
# Assert
assert issubclass(SourceSetSwitchNotSupportedError, SourceSetSwitchError)
def test_ac9_gcs_errors_share_base() -> None:
# Assert
for err_cls in [GcsEmitError, GcsAdapterConfigError]:
assert issubclass(err_cls, GcsAdapterError), err_cls
def test_ac9_fc_and_gcs_trees_are_disjoint() -> None:
# Assert — catching FcAdapterError must NOT catch GcsAdapterError
assert not issubclass(GcsAdapterError, FcAdapterError)
assert not issubclass(FcAdapterError, GcsAdapterError)
# ----------------------------------------------------------------------
# AC-10: INFO log on build
def test_ac10_info_log_on_fc_build(
monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange
monkeypatch.setenv("BUILD_FC_ARDUPILOT_PLANE", "ON")
register_fc_adapter("ardupilot_plane", lambda **_: _FcStub())
config = Config(fc=FcConfig(adapter="ardupilot_plane", port_device="/dev/ttyS0"))
# Act
with caplog.at_level(logging.INFO, logger="runtime_root.fc_factory"):
build_fc_adapter(config)
# Assert — exactly one strategy_loaded record
matches = [
r for r in caplog.records if getattr(r, "kind", None) == "c8.adapter.strategy_loaded"
]
assert len(matches) == 1
assert matches[0].kv["strategy"] == "ardupilot_plane"
assert matches[0].kv["port_device"] == "/dev/ttyS0"
def test_ac10_info_log_on_gcs_build(
monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
) -> None:
# Arrange
monkeypatch.setenv("BUILD_GCS_QGC_MAVLINK", "ON")
register_gcs_adapter("qgc_mavlink", lambda **_: _GcsStub())
config = Config(gcs=GcsConfig(adapter="qgc_mavlink", port_device="/dev/ttyS1"))
# Act
with caplog.at_level(logging.INFO, logger="runtime_root.fc_factory"):
build_gcs_adapter(config)
# Assert
matches = [r for r in caplog.records if getattr(r, "kind", None) == "c8.gcs.strategy_loaded"]
assert len(matches) == 1
# ----------------------------------------------------------------------
# NFR: build perf (loose budget — sanity check, not microbench)
def test_nfr_perf_fc_build_under_50ms(monkeypatch: pytest.MonkeyPatch) -> None:
# Arrange
import time
monkeypatch.setenv("BUILD_FC_ARDUPILOT_PLANE", "ON")
register_fc_adapter("ardupilot_plane", lambda **_: _FcStub())
config = Config(fc=FcConfig(adapter="ardupilot_plane"))
# Act
start = time.monotonic()
build_fc_adapter(config)
elapsed_s = time.monotonic() - start
# Assert
assert elapsed_s < 0.05, f"build took {elapsed_s * 1000:.2f}ms (budget 50ms)"
# ----------------------------------------------------------------------
# Coverage of the FcConfig.signing_key_source validator
def test_signing_key_source_unknown_value_rejected() -> None:
# Act + Assert
with pytest.raises(ConfigError, match=r"signing_key_source"):
FcConfig(adapter="ardupilot_plane", signing_key_source="garbage")
def test_gcs_summary_rate_out_of_range_rejected() -> None:
# Act + Assert — too high
with pytest.raises(ConfigError, match=r"summary_rate_hz"):
GcsConfig(summary_rate_hz=5.0)
# Too low
with pytest.raises(ConfigError, match=r"summary_rate_hz"):
GcsConfig(summary_rate_hz=0.5)
@@ -0,0 +1,306 @@
"""AZ-392 — CovarianceProjector unit tests.
Covers all 7 ACs:
1. 6x6 -> 3x3 -> 2x2 projection correctness
2. AP returns meters (float)
3. iNav returns mm (uint16-clamped)
4. SPD violation raises FcEmitError + emits FDR ERROR
5. NaN guard
6. Bit-stable across calls (deterministic intermediate arithmetic)
7. iNav clamp at 65535 emits WARN log
"""
from __future__ import annotations
import logging
import math
from datetime import datetime, timezone
from unittest import mock
import numpy as np
import pytest
from gps_denied_onboard._types.pose import EstimatorHealth, EstimatorOutput
from gps_denied_onboard.components.c8_fc_adapter._covariance_projector import (
CovarianceProjector,
)
from gps_denied_onboard.components.c8_fc_adapter.errors import FcEmitError
from gps_denied_onboard.fdr_client.records import FdrRecord
def _output(cov: np.ndarray | None, frame_id: int = 7) -> EstimatorOutput:
return EstimatorOutput(
frame_id=frame_id,
timestamp=datetime.now(tz=timezone.utc),
pose_se3=np.eye(4),
covariance_6x6=cov,
source_label="visual_propagated",
health=EstimatorHealth(),
)
def _spd_6x6(sigma_xx: float = 4.0, sigma_yy: float = 9.0, sigma_xy: float = 0.0) -> np.ndarray:
"""Construct a 6x6 covariance whose 2x2 horizontal block has known eigenvalues."""
cov = np.eye(6)
cov[0, 0] = sigma_xx
cov[1, 1] = sigma_yy
cov[0, 1] = sigma_xy
cov[1, 0] = sigma_xy
return cov
def _fake_fdr_client() -> mock.MagicMock:
client = mock.MagicMock()
client.enqueue.return_value = "OK"
return client
# ----------------------------------------------------------------------
# AC-1: 6x6 -> 3x3 -> 2x2 reduction correctness
def test_ac1_diagonal_covariance_returns_largest_sigma() -> None:
# Arrange — sigma_yy > sigma_xx, so lambda_max == 9.0 and radius == 3.0
proj = CovarianceProjector(fdr_client=_fake_fdr_client())
out = _output(_spd_6x6(sigma_xx=4.0, sigma_yy=9.0))
# Act
radius_m = proj.to_ardupilot_horiz_accuracy_m(out)
# Assert
assert math.isclose(radius_m, 3.0, abs_tol=1e-9)
def test_ac1_with_off_diagonal_uses_eigenvalue() -> None:
# Arrange — [[4, 1], [1, 4]] has eigenvalues 5 and 3; lambda_max = 5
proj = CovarianceProjector(fdr_client=_fake_fdr_client())
out = _output(_spd_6x6(sigma_xx=4.0, sigma_yy=4.0, sigma_xy=1.0))
# Act
radius_m = proj.to_ardupilot_horiz_accuracy_m(out)
# Assert
assert math.isclose(radius_m, math.sqrt(5.0), rel_tol=1e-12)
# ----------------------------------------------------------------------
# AC-2: AP returns meters (float)
def test_ac2_ap_returns_float_meters() -> None:
# Arrange
proj = CovarianceProjector(fdr_client=_fake_fdr_client())
out = _output(_spd_6x6(sigma_xx=1.0, sigma_yy=1.0))
# Act
result = proj.to_ardupilot_horiz_accuracy_m(out)
# Assert
assert isinstance(result, float)
assert math.isclose(result, 1.0, abs_tol=1e-12)
# ----------------------------------------------------------------------
# AC-3: iNav returns mm (int)
def test_ac3_inav_returns_int_millimeters() -> None:
# Arrange — sigma == 1 m^2 -> radius = 1 m = 1000 mm
proj = CovarianceProjector(fdr_client=_fake_fdr_client())
out = _output(_spd_6x6(sigma_xx=1.0, sigma_yy=1.0))
# Act
result = proj.to_inav_h_pos_accuracy_mm(out)
# Assert
assert isinstance(result, int)
assert result == 1000
def test_ac3_inav_rounds_half_up() -> None:
# Arrange — radius = sqrt(0.5) ~ 0.7071067... m -> 707.1067 mm -> round-half-up -> 707
proj = CovarianceProjector(fdr_client=_fake_fdr_client())
out = _output(_spd_6x6(sigma_xx=0.5, sigma_yy=0.5))
# Act
result = proj.to_inav_h_pos_accuracy_mm(out)
# Assert
assert result == 707
# ----------------------------------------------------------------------
# AC-4 + AC-5: SPD / NaN violations
def test_ac4_non_spd_raises_fc_emit_error_and_logs_fdr() -> None:
# Arrange — negative determinant 2x2 block
fdr = _fake_fdr_client()
proj = CovarianceProjector(fdr_client=fdr)
cov = _spd_6x6()
cov[0, 0] = -1.0 # break positive-definiteness
out = _output(cov)
# Act + Assert
with pytest.raises(FcEmitError, match=r"non-SPD"):
proj.to_ardupilot_horiz_accuracy_m(out)
# Assert — exactly one FDR ERROR record with the violation kv
assert fdr.enqueue.call_count == 1
record: FdrRecord = fdr.enqueue.call_args.args[0]
assert record.kind == "log"
assert record.payload["kv"]["reason"] == "non_spd"
assert record.payload["kv"]["frame_id"] == 7
def test_ac4_asymmetric_2x2_rejected() -> None:
# Arrange
proj = CovarianceProjector(fdr_client=_fake_fdr_client())
cov = np.eye(6)
cov[0, 1] = 1.0
cov[1, 0] = 2.0 # ≠ cov[0, 1] → asymmetric
out = _output(cov)
# Act + Assert
with pytest.raises(FcEmitError, match=r"non-SPD"):
proj.to_ardupilot_horiz_accuracy_m(out)
def test_ac5_nan_covariance_rejected() -> None:
# Arrange
fdr = _fake_fdr_client()
proj = CovarianceProjector(fdr_client=fdr)
cov = _spd_6x6()
cov[2, 2] = math.nan
out = _output(cov)
# Act + Assert
with pytest.raises(FcEmitError, match=r"NaN"):
proj.to_ardupilot_horiz_accuracy_m(out)
assert fdr.enqueue.call_args.args[0].payload["kv"]["reason"] == "nan_or_inf"
def test_ac5_inf_covariance_rejected() -> None:
# Arrange
proj = CovarianceProjector(fdr_client=_fake_fdr_client())
cov = _spd_6x6()
cov[2, 2] = math.inf
out = _output(cov)
# Act + Assert
with pytest.raises(FcEmitError):
proj.to_ardupilot_horiz_accuracy_m(out)
def test_ac5_missing_covariance_rejected() -> None:
# Arrange
fdr = _fake_fdr_client()
proj = CovarianceProjector(fdr_client=fdr)
out = _output(cov=None)
# Act + Assert
with pytest.raises(FcEmitError, match=r"missing"):
proj.to_ardupilot_horiz_accuracy_m(out)
assert fdr.enqueue.call_args.args[0].payload["kv"]["reason"] == "missing"
def test_ac5_wrong_shape_rejected() -> None:
# Arrange
fdr = _fake_fdr_client()
proj = CovarianceProjector(fdr_client=fdr)
out = _output(cov=np.eye(5))
# Act + Assert
with pytest.raises(FcEmitError, match=r"6x6"):
proj.to_ardupilot_horiz_accuracy_m(out)
assert fdr.enqueue.call_args.args[0].payload["kv"]["reason"] == "bad_shape"
# ----------------------------------------------------------------------
# AC-6: bit-stable across calls
def test_ac6_bit_stable_repeated_calls() -> None:
# Arrange — repeated calls on identical input yield bit-identical output
proj = CovarianceProjector(fdr_client=_fake_fdr_client())
out = _output(_spd_6x6(sigma_xx=3.7, sigma_yy=2.1, sigma_xy=0.4))
# Act
results = [proj.to_ardupilot_horiz_accuracy_m(out) for _ in range(20)]
# Assert — every call produces exactly the same float
assert len(set(results)) == 1
def test_ac6_ap_and_inav_round_trip_consistent() -> None:
# Arrange — AP m * 1000 (round-half-up) must equal iNav mm
proj = CovarianceProjector(fdr_client=_fake_fdr_client())
out = _output(_spd_6x6(sigma_xx=2.0, sigma_yy=2.0))
# Act
radius_m = proj.to_ardupilot_horiz_accuracy_m(out)
radius_mm = proj.to_inav_h_pos_accuracy_mm(out)
# Assert
assert radius_mm == math.floor(radius_m * 1000.0 + 0.5)
# ----------------------------------------------------------------------
# AC-7: iNav clamp at 65535
def test_ac7_inav_clamps_at_uint16_max(caplog: pytest.LogCaptureFixture) -> None:
# Arrange — sigma huge enough that radius_mm > 65535
proj = CovarianceProjector(fdr_client=_fake_fdr_client())
huge_sigma = 1e8 # radius ~ sqrt(1e8) m = 10000 m = 10_000_000 mm
out = _output(_spd_6x6(sigma_xx=huge_sigma, sigma_yy=huge_sigma))
# Act
with caplog.at_level(logging.WARNING, logger="c8_fc_adapter.cov_projector"):
result = proj.to_inav_h_pos_accuracy_mm(out)
# Assert
assert result == 65535
clamp_records = [
r for r in caplog.records if getattr(r, "kind", None) == "c8.cov_projector.inav_clamped"
]
assert len(clamp_records) == 1
assert clamp_records[0].kv["clamped_to"] == 65535
assert clamp_records[0].kv["frame_id"] == 7
def test_ac7_inav_exact_at_uint16_max_not_clamped() -> None:
# Arrange — pick sigma_xx so that radius_mm == 65535 exactly
# radius_m = 65.535, sigma = 65.535^2 = 4294.838...
sigma = (65.535) ** 2
proj = CovarianceProjector(fdr_client=_fake_fdr_client())
out = _output(_spd_6x6(sigma_xx=sigma, sigma_yy=sigma))
# Act
result = proj.to_inav_h_pos_accuracy_mm(out)
# Assert — at the boundary, no clamp
assert result == 65535
# ----------------------------------------------------------------------
# NFR: perf (loose budget — projector must be fast enough for 20 Hz emit)
def test_nfr_perf_projector_under_100us_per_call() -> None:
# Arrange
import time
proj = CovarianceProjector(fdr_client=_fake_fdr_client())
out = _output(_spd_6x6(sigma_xx=2.0, sigma_yy=3.0, sigma_xy=0.5))
iters = 1000
# Act
start = time.perf_counter()
for _ in range(iters):
proj.to_ardupilot_horiz_accuracy_m(out)
avg_s = (time.perf_counter() - start) / iters
# Assert
assert avg_s < 100e-6, f"avg {avg_s * 1e6:.1f}us > 100us budget"
+15 -1
View File
@@ -1,4 +1,4 @@
"""C8 FC Adapter smoke test — AC-9."""
"""C8 FC Adapter smoke test — AC-9 (legacy) + AZ-390 public-API gate."""
def test_interface_importable() -> None:
@@ -12,3 +12,17 @@ def test_interface_importable() -> None:
for sym in (FcAdapter, GcsAdapter, ReplaySink, EmittedExternalPosition):
assert sym is not None
def test_internal_modules_not_in_public_all() -> None:
"""AZ-390 AC-8: only the contract symbols appear in ``__all__``."""
# Arrange
from gps_denied_onboard.components import c8_fc_adapter
# Assert
assert set(c8_fc_adapter.__all__) == {
"EmittedExternalPosition",
"FcAdapter",
"GcsAdapter",
"ReplaySink",
}