Files
gps-denied-onboard/components/sse_event_streamer/base.py
T
Oleksandr Bezdieniezhnykh abc26d5c20 initial structure implemented
docs -> _docs
2025-12-01 14:20:56 +02:00

45 lines
1015 B
Python

from abc import ABC, abstractmethod
from typing import AsyncIterator
from models.results import FrameResult
from models.recovery import UserInputRequest
class SSEEventStreamerBase(ABC):
@abstractmethod
async def emit_frame_result(
self, flight_id: str, result: FrameResult
) -> None:
pass
@abstractmethod
async def emit_status_update(
self, flight_id: str, status: dict
) -> None:
pass
@abstractmethod
async def emit_user_input_request(
self, flight_id: str, request: UserInputRequest
) -> None:
pass
@abstractmethod
async def emit_error(
self, flight_id: str, error: str
) -> None:
pass
@abstractmethod
def subscribe(self, flight_id: str) -> AsyncIterator[dict]:
pass
@abstractmethod
async def unsubscribe(self, flight_id: str, subscriber_id: str) -> None:
pass
@abstractmethod
async def get_subscriber_count(self, flight_id: str) -> int:
pass