Files
gps-denied-onboard/tests/unit/test_mavlink_gcs_integration.py
Oleksandr Bezdieniezhnykh e86084da6b [AZ-223] [AZ-224] [AZ-225] [AZ-227] Add runtime gateways
Implement the first runtime component boundaries around the shared
contracts so downstream batches can consume typed frame, MAVLink, tile,
and FDR behavior with focused tests and batch evidence.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 18:01:13 +03:00

73 lines
2.2 KiB
Python

from shared.contracts import PositionEstimate
from mavlink_gcs_integration import (
FlightControllerTelemetry,
InMemoryMavlinkGateway,
OperatorStatusMessage,
)
def test_telemetry_subscription_emits_normalized_sample() -> None:
# Arrange
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000)
telemetry = FlightControllerTelemetry(
timestamp_ns=1_000,
acceleration_mps2=(0.1, 0.2, -9.8),
attitude_rad=(0.01, 0.02, 1.57),
altitude_m=250.0,
airspeed_mps=17.5,
gps_health="lost",
)
# Act
samples = gateway.subscribe_telemetry([telemetry])
# Assert
assert len(samples) == 1
assert samples[0].imu["accel_z"] == -9.8
assert samples[0].attitude["yaw"] == 1.57
assert samples[0].gps_health == "lost"
def test_invalid_gps_input_estimate_is_rejected_without_emission() -> None:
# Arrange
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000)
estimate = PositionEstimate(
timestamp_ns=2_000,
latitude_deg=49.9,
longitude_deg=36.2,
altitude_m=250.0,
covariance_semimajor_m=10.0,
source_label="no_fix",
fix_type=1,
horizontal_accuracy_m=10.0,
anchor_age_ms=0,
)
# Act
result = gateway.emit_gps_input(estimate)
# Assert
assert result.emitted is False
assert result.error is not None
assert result.error.category == "validation"
assert gateway.emitted_gps_inputs == []
def test_operator_status_messages_are_rate_limited_by_text() -> None:
# Arrange
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000)
messages = [
OperatorStatusMessage(timestamp_ns=1_000, severity="warning", text="GPS denied"),
OperatorStatusMessage(timestamp_ns=1_500, severity="warning", text="GPS denied"),
OperatorStatusMessage(timestamp_ns=2_100, severity="warning", text="GPS denied"),
]
# Act
result = gateway.emit_status(messages)
# Assert
assert [message.timestamp_ns for message in result.emitted] == [1_000, 2_100]
assert [message.timestamp_ns for message in result.suppressed] == [1_500]
assert len(gateway.emitted_status_messages) == 2