mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-21 23:11:13 +00:00
e86084da6b
Implement the first runtime component boundaries around the shared contracts so downstream batches can consume typed frame, MAVLink, tile, and FDR behavior with focused tests and batch evidence. Co-authored-by: Cursor <cursoragent@cursor.com>
73 lines
2.2 KiB
Python
73 lines
2.2 KiB
Python
from shared.contracts import PositionEstimate
|
|
|
|
from mavlink_gcs_integration import (
|
|
FlightControllerTelemetry,
|
|
InMemoryMavlinkGateway,
|
|
OperatorStatusMessage,
|
|
)
|
|
|
|
|
|
def test_telemetry_subscription_emits_normalized_sample() -> None:
|
|
# Arrange
|
|
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000)
|
|
telemetry = FlightControllerTelemetry(
|
|
timestamp_ns=1_000,
|
|
acceleration_mps2=(0.1, 0.2, -9.8),
|
|
attitude_rad=(0.01, 0.02, 1.57),
|
|
altitude_m=250.0,
|
|
airspeed_mps=17.5,
|
|
gps_health="lost",
|
|
)
|
|
|
|
# Act
|
|
samples = gateway.subscribe_telemetry([telemetry])
|
|
|
|
# Assert
|
|
assert len(samples) == 1
|
|
assert samples[0].imu["accel_z"] == -9.8
|
|
assert samples[0].attitude["yaw"] == 1.57
|
|
assert samples[0].gps_health == "lost"
|
|
|
|
|
|
def test_invalid_gps_input_estimate_is_rejected_without_emission() -> None:
|
|
# Arrange
|
|
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000)
|
|
estimate = PositionEstimate(
|
|
timestamp_ns=2_000,
|
|
latitude_deg=49.9,
|
|
longitude_deg=36.2,
|
|
altitude_m=250.0,
|
|
covariance_semimajor_m=10.0,
|
|
source_label="no_fix",
|
|
fix_type=1,
|
|
horizontal_accuracy_m=10.0,
|
|
anchor_age_ms=0,
|
|
)
|
|
|
|
# Act
|
|
result = gateway.emit_gps_input(estimate)
|
|
|
|
# Assert
|
|
assert result.emitted is False
|
|
assert result.error is not None
|
|
assert result.error.category == "validation"
|
|
assert gateway.emitted_gps_inputs == []
|
|
|
|
|
|
def test_operator_status_messages_are_rate_limited_by_text() -> None:
|
|
# Arrange
|
|
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000)
|
|
messages = [
|
|
OperatorStatusMessage(timestamp_ns=1_000, severity="warning", text="GPS denied"),
|
|
OperatorStatusMessage(timestamp_ns=1_500, severity="warning", text="GPS denied"),
|
|
OperatorStatusMessage(timestamp_ns=2_100, severity="warning", text="GPS denied"),
|
|
]
|
|
|
|
# Act
|
|
result = gateway.emit_status(messages)
|
|
|
|
# Assert
|
|
assert [message.timestamp_ns for message in result.emitted] == [1_000, 2_100]
|
|
assert [message.timestamp_ns for message in result.suppressed] == [1_500]
|
|
assert len(gateway.emitted_status_messages) == 2
|