mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 21:01:13 +00:00
[AZ-223] [AZ-224] [AZ-225] [AZ-227] Add runtime gateways
Implement the first runtime component boundaries around the shared contracts so downstream batches can consume typed frame, MAVLink, tile, and FDR behavior with focused tests and batch evidence. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -0,0 +1,72 @@
|
||||
from shared.contracts import PositionEstimate
|
||||
|
||||
from mavlink_gcs_integration import (
|
||||
FlightControllerTelemetry,
|
||||
InMemoryMavlinkGateway,
|
||||
OperatorStatusMessage,
|
||||
)
|
||||
|
||||
|
||||
def test_telemetry_subscription_emits_normalized_sample() -> None:
|
||||
# Arrange
|
||||
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000)
|
||||
telemetry = FlightControllerTelemetry(
|
||||
timestamp_ns=1_000,
|
||||
acceleration_mps2=(0.1, 0.2, -9.8),
|
||||
attitude_rad=(0.01, 0.02, 1.57),
|
||||
altitude_m=250.0,
|
||||
airspeed_mps=17.5,
|
||||
gps_health="lost",
|
||||
)
|
||||
|
||||
# Act
|
||||
samples = gateway.subscribe_telemetry([telemetry])
|
||||
|
||||
# Assert
|
||||
assert len(samples) == 1
|
||||
assert samples[0].imu["accel_z"] == -9.8
|
||||
assert samples[0].attitude["yaw"] == 1.57
|
||||
assert samples[0].gps_health == "lost"
|
||||
|
||||
|
||||
def test_invalid_gps_input_estimate_is_rejected_without_emission() -> None:
|
||||
# Arrange
|
||||
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000)
|
||||
estimate = PositionEstimate(
|
||||
timestamp_ns=2_000,
|
||||
latitude_deg=49.9,
|
||||
longitude_deg=36.2,
|
||||
altitude_m=250.0,
|
||||
covariance_semimajor_m=10.0,
|
||||
source_label="no_fix",
|
||||
fix_type=1,
|
||||
horizontal_accuracy_m=10.0,
|
||||
anchor_age_ms=0,
|
||||
)
|
||||
|
||||
# Act
|
||||
result = gateway.emit_gps_input(estimate)
|
||||
|
||||
# Assert
|
||||
assert result.emitted is False
|
||||
assert result.error is not None
|
||||
assert result.error.category == "validation"
|
||||
assert gateway.emitted_gps_inputs == []
|
||||
|
||||
|
||||
def test_operator_status_messages_are_rate_limited_by_text() -> None:
|
||||
# Arrange
|
||||
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000)
|
||||
messages = [
|
||||
OperatorStatusMessage(timestamp_ns=1_000, severity="warning", text="GPS denied"),
|
||||
OperatorStatusMessage(timestamp_ns=1_500, severity="warning", text="GPS denied"),
|
||||
OperatorStatusMessage(timestamp_ns=2_100, severity="warning", text="GPS denied"),
|
||||
]
|
||||
|
||||
# Act
|
||||
result = gateway.emit_status(messages)
|
||||
|
||||
# Assert
|
||||
assert [message.timestamp_ns for message in result.emitted] == [1_000, 2_100]
|
||||
assert [message.timestamp_ns for message in result.suppressed] == [1_500]
|
||||
assert len(gateway.emitted_status_messages) == 2
|
||||
Reference in New Issue
Block a user