Files
gps-denied-onboard/components/sse_event_streamer/sse_event_streamer.py
T
Oleksandr Bezdieniezhnykh abc26d5c20 initial structure implemented
docs -> _docs
2025-12-01 14:20:56 +02:00

39 lines
1.0 KiB
Python

from typing import AsyncIterator
from .base import SSEEventStreamerBase
from models.results import FrameResult
from models.recovery import UserInputRequest
class SSEEventStreamer(SSEEventStreamerBase):
async def emit_frame_result(
self, flight_id: str, result: FrameResult
) -> None:
raise NotImplementedError
async def emit_status_update(
self, flight_id: str, status: dict
) -> None:
raise NotImplementedError
async def emit_user_input_request(
self, flight_id: str, request: UserInputRequest
) -> None:
raise NotImplementedError
async def emit_error(
self, flight_id: str, error: str
) -> None:
raise NotImplementedError
async def subscribe(self, flight_id: str) -> AsyncIterator[dict]:
raise NotImplementedError
yield
async def unsubscribe(self, flight_id: str, subscriber_id: str) -> None:
raise NotImplementedError
async def get_subscriber_count(self, flight_id: str) -> int:
raise NotImplementedError