Files
gps-denied-onboard/test_f15_sse_event_streamer.py
Denys Zaitsev d7e1066c60 Initial commit
2026-04-03 23:25:54 +03:00

188 lines
7.3 KiB
Python

import pytest
import asyncio
import json
from datetime import datetime
from f15_sse_event_streamer import SSEEventStreamer
@pytest.fixture
def streamer():
return SSEEventStreamer(max_buffer_size=10, queue_maxsize=5)
@pytest.fixture
def dummy_frame():
return {
"frame_id": 237,
"gps": {"lat": 48.123, "lon": 37.456},
"altitude": 800.0,
"confidence": 0.95,
"heading": 87.3,
"timestamp": "2025-11-24T10:30:00Z"
}
class TestSSEEventStreamer:
# --- 15.01 Connection Lifecycle Management ---
@pytest.mark.asyncio
async def test_create_and_close_stream(self, streamer):
gen = streamer.create_stream("flight_1", "client_1")
# Kickstart generator to register connection
task = asyncio.create_task(gen.__anext__())
while streamer.get_active_connections("flight_1") == 0:
await asyncio.sleep(0.01)
assert streamer.get_active_connections("flight_1") == 1
assert "client_1" in streamer._connections["flight_1"]
# Close Stream
assert streamer.close_stream("flight_1", "client_1") is True
assert streamer.get_active_connections("flight_1") == 0
# Closing non-existent is safe
assert streamer.close_stream("flight_1", "client_1") is False
task.cancel()
@pytest.mark.asyncio
async def test_multiple_client_connections(self, streamer):
gen1 = streamer.create_stream("flight_multi", "client_1")
gen2 = streamer.create_stream("flight_multi", "client_2")
t1 = asyncio.create_task(gen1.__anext__())
t2 = asyncio.create_task(gen2.__anext__())
while streamer.get_active_connections("flight_multi") < 2:
await asyncio.sleep(0.01)
assert streamer.get_active_connections("flight_multi") == 2
# Event should be broadcasted to both
streamer.send_frame_result("flight_multi", {"msg": "hello"})
ev1 = await t1
ev2 = await t2
assert json.loads(ev1["data"])["msg"] == "hello"
assert json.loads(ev2["data"])["msg"] == "hello"
@pytest.mark.asyncio
async def test_slow_client_disconnect(self, streamer):
# Queue max size is 5 in our fixture
gen = streamer.create_stream("flight_slow", "client_slow")
task = asyncio.create_task(gen.__anext__())
while streamer.get_active_connections("flight_slow") == 0:
await asyncio.sleep(0.01)
# Blast 10 events immediately without yielding processing time to consumer
for i in range(10):
streamer.send_frame_result("flight_slow", {"frame": i})
# QueueFull exception should have been caught internally, closing the connection
assert streamer.get_active_connections("flight_slow") == 0
task.cancel()
# --- 15.02 Event Broadcasting & Buffering ---
@pytest.mark.asyncio
async def test_send_frame_result_formatting(self, streamer, dummy_frame):
gen = streamer.create_stream("flight_fmt", "client_fmt")
task = asyncio.create_task(gen.__anext__())
while streamer.get_active_connections("flight_fmt") == 0:
await asyncio.sleep(0.01)
streamer.send_frame_result("flight_fmt", dummy_frame)
event = await task
assert event["event"] == "frame_processed"
assert event["id"] == "1"
parsed_data = json.loads(event["data"])
assert parsed_data["frame_id"] == 237
assert parsed_data["gps"]["lat"] == 48.123
@pytest.mark.asyncio
async def test_heartbeat_formatting(self, streamer):
gen = streamer.create_stream("flight_hb", "client_hb")
task = asyncio.create_task(gen.__anext__())
while streamer.get_active_connections("flight_hb") == 0:
await asyncio.sleep(0.01)
streamer.send_heartbeat("flight_hb")
event = await task
assert "comment" in event
assert "heartbeat" in event["comment"]
@pytest.mark.asyncio
async def test_replay_buffered_events_on_reconnect(self, streamer):
streamer.send_frame_result("flight_buf", {"frame_id": 100})
streamer.send_frame_result("flight_buf", {"frame_id": 101})
streamer.send_frame_result("flight_buf", {"frame_id": 102})
# We know IDs are "1", "2", "3". Let's reconnect from ID "1"
gen = streamer.create_stream("flight_buf", "client_reconn", last_event_id="1")
ev1 = await gen.__anext__()
ev2 = await gen.__anext__()
assert ev1["id"] == "2"
assert ev2["id"] == "3"
@pytest.mark.asyncio
async def test_event_filtering(self, streamer):
gen = streamer.create_stream("flight_flt", "client_flt", event_types=["user_input_needed"])
task = asyncio.create_task(gen.__anext__())
while streamer.get_active_connections("flight_flt") == 0:
await asyncio.sleep(0.01)
streamer.send_frame_result("flight_flt", {"frame_id": 1}) # Ignored
streamer.send_user_input_request("flight_flt", {"req": "test"}) # Kept
ev = await task
assert ev["event"] == "user_input_needed"
@pytest.mark.asyncio
async def test_send_trajectory_refined_event(self, streamer):
gen = streamer.create_stream("flight_tr", "client_tr")
task = asyncio.create_task(gen.__anext__())
while streamer.get_active_connections("flight_tr") == 0:
await asyncio.sleep(0.01)
streamer.send_generic_event("flight_tr", "trajectory_refined", {"refined_images": ["AD000001"], "refinement_reason": "new_gps_anchor"})
event = await task
assert event["event"] == "trajectory_refined"
assert "new_gps_anchor" in event["data"]
@pytest.mark.asyncio
async def test_send_user_fix_applied_event(self, streamer):
gen = streamer.create_stream("flight_uf", "client_uf")
task = asyncio.create_task(gen.__anext__())
while streamer.get_active_connections("flight_uf") == 0:
await asyncio.sleep(0.01)
streamer.send_generic_event("flight_uf", "user_fix_applied", {"image_id": "AD000015", "fixed_gps": {"lat": 48.0, "lon": 37.0}})
event = await task
assert event["event"] == "user_fix_applied"
@pytest.mark.asyncio
async def test_send_processing_error_event(self, streamer):
gen = streamer.create_stream("flight_err", "client_err")
task = asyncio.create_task(gen.__anext__())
while streamer.get_active_connections("flight_err") == 0:
await asyncio.sleep(0.01)
streamer.send_generic_event("flight_err", "processing_error", {"error_type": "tracking_lost"})
event = await task
assert event["event"] == "processing_error"
@pytest.mark.asyncio
async def test_send_flight_status_event(self, streamer):
gen = streamer.create_stream("flight_stat", "client_stat")
task = asyncio.create_task(gen.__anext__())
while streamer.get_active_connections("flight_stat") == 0:
await asyncio.sleep(0.01)
streamer.send_generic_event("flight_stat", "flight_status", {"status": "completed", "total_images": 60})
event = await task
assert event["event"] == "flight_status"