add features

This commit is contained in:
Oleksandr Bezdieniezhnykh
2025-12-01 01:07:46 +02:00
parent 97f558b3d7
commit 54be35fde7
81 changed files with 4618 additions and 10 deletions
@@ -0,0 +1,61 @@
# Feature: Connection Lifecycle Management
## Description
Manages the lifecycle of SSE (Server-Sent Events) connections including creation, tracking, health monitoring, and graceful closure. Supports multiple concurrent client connections per flight with proper resource management.
## Component APIs Implemented
### `create_stream(flight_id: str, client_id: str) -> StreamConnection`
Establishes a new SSE connection for a client subscribing to a flight's events.
### `close_stream(flight_id: str, client_id: str) -> bool`
Closes an existing SSE connection and cleans up associated resources.
### `get_active_connections(flight_id: str) -> int`
Returns the count of active SSE connections for a given flight.
## External Tools and Services
- **FastAPI/Starlette**: SSE response streaming support
- **asyncio**: Asynchronous connection handling and concurrent stream management
## Internal Methods
| Method | Purpose |
|--------|---------|
| `_register_connection(flight_id, client_id, stream)` | Adds connection to internal registry |
| `_unregister_connection(flight_id, client_id)` | Removes connection from registry |
| `_get_connections_for_flight(flight_id)` | Returns all active streams for a flight |
| `_generate_stream_id()` | Creates unique identifier for stream |
| `_handle_client_disconnect(flight_id, client_id)` | Cleanup on unexpected disconnect |
## Data Structures
```python
# Connection registry: Dict[flight_id, Dict[client_id, StreamConnection]]
# Allows O(1) lookup by flight and client
```
## Unit Tests
| Test | Description |
|------|-------------|
| `test_create_stream_returns_valid_connection` | Verify StreamConnection has all required fields |
| `test_create_stream_unique_stream_ids` | Multiple streams get unique identifiers |
| `test_close_stream_removes_from_registry` | Connection no longer tracked after close |
| `test_close_stream_nonexistent_returns_false` | Graceful handling of invalid close |
| `test_get_active_connections_empty` | Returns 0 when no connections exist |
| `test_get_active_connections_multiple_clients` | Correctly counts concurrent connections |
| `test_multiple_flights_isolated` | Connections for different flights don't interfere |
| `test_same_client_reconnect` | Client can reconnect with same client_id |
## Integration Tests
| Test | Description |
|------|-------------|
| `test_concurrent_client_connections` | 10 clients connect simultaneously to same flight |
| `test_connection_survives_idle_period` | Connection remains open during inactivity |
| `test_client_disconnect_detection` | System detects when client drops connection |
| `test_connection_cleanup_on_flight_completion` | All connections closed when flight ends |
@@ -0,0 +1,83 @@
# Feature: Event Broadcasting
## Description
Broadcasts various event types to all connected clients for a flight. Handles event formatting per SSE protocol, buffering events for temporarily disconnected clients, and replay of missed events on reconnection. Includes heartbeat mechanism for connection health.
## Component APIs Implemented
### `send_frame_result(flight_id: str, frame_result: FrameResult) -> bool`
Broadcasts `frame_processed` event with GPS coordinates, confidence, and metadata.
### `send_search_progress(flight_id: str, search_status: SearchStatus) -> bool`
Broadcasts `search_expanded` event during failure recovery search operations.
### `send_user_input_request(flight_id: str, request: UserInputRequest) -> bool`
Broadcasts `user_input_needed` event when processing requires user intervention.
### `send_refinement(flight_id: str, frame_id: int, updated_result: FrameResult) -> bool`
Broadcasts `frame_refined` event when factor graph optimization improves a position.
### `send_heartbeat(flight_id: str) -> bool`
Sends keepalive ping to all clients to maintain connection health.
## External Tools and Services
- **FastAPI/Starlette**: SSE response streaming
- **asyncio**: Async event delivery to multiple clients
- **JSON**: Event data serialization
## Internal Methods
| Method | Purpose |
|--------|---------|
| `_format_sse_event(event_type, event_id, data)` | Formats data per SSE protocol spec |
| `_broadcast_to_flight(flight_id, event)` | Sends event to all clients of a flight |
| `_buffer_event(flight_id, event)` | Stores event for replay to reconnecting clients |
| `_get_buffered_events(flight_id, last_event_id)` | Retrieves events after given ID for replay |
| `_prune_event_buffer(flight_id)` | Removes old events beyond retention window |
| `_generate_event_id(frame_id, event_type)` | Creates sequential event ID for ordering |
## Event Types
| Event | Format |
|-------|--------|
| `frame_processed` | `{frame_id, gps, altitude, confidence, heading, timestamp}` |
| `frame_refined` | `{frame_id, gps, refined: true}` |
| `search_expanded` | `{frame_id, grid_size, status}` |
| `user_input_needed` | `{request_id, frame_id, candidate_tiles}` |
| `:heartbeat` | SSE comment (no data payload) |
## Buffering Strategy
- Events buffered per flight with configurable retention (default: 1000 events or 5 minutes)
- Events keyed by sequential ID for replay ordering
- On reconnection with `last_event_id`, replay all events after that ID
## Unit Tests
| Test | Description |
|------|-------------|
| `test_send_frame_result_formats_correctly` | Event matches expected JSON structure |
| `test_send_refinement_includes_refined_flag` | Refined events marked appropriately |
| `test_send_search_progress_valid_status` | Status field correctly populated |
| `test_send_user_input_request_has_request_id` | Request includes unique identifier |
| `test_send_heartbeat_sse_comment_format` | Heartbeat uses `:heartbeat` comment format |
| `test_buffer_event_stores_in_order` | Events retrievable in sequence |
| `test_buffer_pruning_removes_old_events` | Buffer doesn't grow unbounded |
| `test_replay_from_last_event_id` | Correct subset returned for replay |
| `test_broadcast_returns_false_no_connections` | Graceful handling when no clients |
| `test_event_id_generation_sequential` | IDs increase monotonically |
## Integration Tests
| Test | Description |
|------|-------------|
| `test_100_frames_all_received_in_order` | Stream 100 frame results, verify completeness |
| `test_reconnection_replay` | Disconnect after 50 events, reconnect, receive 51-100 |
| `test_multiple_event_types_interleaved` | Mix of frame_processed and frame_refined events |
| `test_user_input_flow_roundtrip` | Send request, verify client receives |
| `test_heartbeat_every_30_seconds` | Verify keepalive timing |
| `test_event_latency_under_500ms` | Measure generation-to-receipt time |
| `test_high_throughput_100_events_per_second` | Sustained event rate handling |