# Flight Database Layer ## Interface Definition **Interface Name**: `IFlightDatabase` ### Interface Methods ```python class IFlightDatabase(ABC): # Flight Operations @abstractmethod def insert_flight(self, flight: Flight) -> str: pass @abstractmethod def update_flight(self, flight: Flight) -> bool: pass @abstractmethod def query_flights(self, filters: Dict[str, Any], limit: int, offset: int) -> List[Flight]: pass @abstractmethod def get_flight_by_id(self, flight_id: str) -> Optional[Flight]: pass @abstractmethod def delete_flight(self, flight_id: str) -> bool: pass # Waypoint Operations @abstractmethod def get_waypoints(self, flight_id: str, limit: Optional[int] = None) -> List[Waypoint]: pass @abstractmethod def insert_waypoint(self, flight_id: str, waypoint: Waypoint) -> str: pass @abstractmethod def update_waypoint(self, flight_id: str, waypoint_id: str, waypoint: Waypoint) -> bool: pass @abstractmethod def batch_update_waypoints(self, flight_id: str, waypoints: List[Waypoint]) -> BatchResult: pass # Flight State Operations @abstractmethod def save_flight_state(self, flight_state: FlightState) -> bool: pass @abstractmethod def load_flight_state(self, flight_id: str) -> Optional[FlightState]: pass @abstractmethod def query_processing_history(self, filters: Dict[str, Any]) -> List[FlightState]: pass # Frame Result Operations @abstractmethod def save_frame_result(self, flight_id: str, frame_result: FrameResult) -> bool: pass @abstractmethod def get_frame_results(self, flight_id: str) -> List[FrameResult]: pass # Heading History Operations @abstractmethod def save_heading(self, flight_id: str, frame_id: int, heading: float, timestamp: datetime) -> bool: pass @abstractmethod def get_heading_history(self, flight_id: str, last_n: Optional[int] = None) -> List[HeadingRecord]: pass @abstractmethod def get_latest_heading(self, flight_id: str) -> Optional[float]: pass # Image Storage Operations @abstractmethod def save_image_metadata(self, flight_id: str, frame_id: int, file_path: str, metadata: Dict) -> bool: pass @abstractmethod def get_image_path(self, flight_id: str, frame_id: int) -> Optional[str]: pass @abstractmethod def get_image_metadata(self, flight_id: str, frame_id: int) -> Optional[Dict]: pass # Chunk State Operations @abstractmethod def save_chunk_state(self, flight_id: str, chunk: ChunkHandle) -> bool: pass @abstractmethod def load_chunk_states(self, flight_id: str) -> List[ChunkHandle]: pass @abstractmethod def delete_chunk_state(self, flight_id: str, chunk_id: str) -> bool: pass ``` ## Component Description ### Responsibilities - Direct database access layer for all flight-related data - Execute SQL queries and commands - Manage database connections and transactions - Handle connection pooling and retry logic - Provide database abstraction (PostgreSQL, MySQL, etc.) - Persist flight state, waypoints, frame results - Store heading history for rotation management - Store image file paths and metadata ### Scope - CRUD operations on flights table - CRUD operations on waypoints table - CRUD operations on geofences table - Flight state persistence - Frame result storage - Heading history tracking - Image metadata storage - Query optimization for large datasets --- ## Flight Operations ### `insert_flight(flight: Flight) -> str` **Description**: Inserts a new flight with initial waypoints and geofences. **Called By**: - F02 Flight Processor **Input**: ```python Flight: id: str name: str description: str start_gps: GPSPoint rough_waypoints: List[Waypoint] geofences: Geofences camera_params: CameraParameters altitude: float created_at: datetime updated_at: datetime ``` **Output**: ```python flight_id: str ``` **Database Operations**: 1. Begin transaction 2. INSERT INTO flights 3. INSERT INTO waypoints for each initial waypoint 4. INSERT INTO geofences for each polygon 5. INSERT INTO flight_state (initial state) 6. Commit transaction **Error Conditions**: - `IntegrityError`: Duplicate flight_id - `DatabaseError`: Connection error, transaction failure - Automatic rollback on error **Test Cases**: 1. **Insert flight with 100 waypoints**: All data persisted 2. **Duplicate flight_id**: Raises IntegrityError 3. **Transaction rollback**: Error mid-insert → complete rollback --- ### `update_flight(flight: Flight) -> bool` **Description**: Updates flight metadata. **Called By**: - F02 Flight Processor **Input**: ```python Flight with updated fields ``` **Output**: ```python bool: True if updated, False if not found ``` **Database Operations**: ```sql UPDATE flights SET name = ?, description = ?, updated_at = ? WHERE id = ? ``` **Test Cases**: 1. **Update existing flight**: Returns True 2. **Update non-existent flight**: Returns False --- ### `query_flights(filters: Dict[str, Any], limit: int, offset: int) -> List[Flight]` **Description**: Queries flights with filtering and pagination. **Called By**: - F02 Flight Processor (listing) - F01 Flight API **Input**: ```python filters: Dict[str, Any] # e.g., {"name": "Mission%", "status": "completed"} limit: int offset: int ``` **Output**: ```python List[Flight] # Metadata only, without full waypoint data ``` **Test Cases**: 1. **Filter by name**: Returns matching flights 2. **Pagination**: offset=100, limit=50 → returns flights 100-149 3. **No matches**: Returns [] --- ### `get_flight_by_id(flight_id: str) -> Optional[Flight]` **Description**: Retrieves complete flight with all waypoints. **Called By**: - F02 Flight Processor **Input**: ```python flight_id: str ``` **Output**: ```python Optional[Flight] # Complete flight with all waypoints ``` **Database Operations**: 1. SELECT FROM flights WHERE id = ? 2. SELECT FROM waypoints WHERE flight_id = ? ORDER BY timestamp 3. SELECT FROM geofences WHERE flight_id = ? 4. Assemble Flight object **Test Cases**: 1. **Existing flight**: Returns complete Flight 2. **Non-existent flight**: Returns None 3. **Large flight (3000 waypoints)**: Returns within 150ms --- ### `delete_flight(flight_id: str) -> bool` **Description**: Deletes a flight and cascades to all related data. **Called By**: - F02 Flight Processor **Input**: ```python flight_id: str ``` **Output**: ```python bool: True if deleted, False if not found ``` **Database Operations**: ```sql DELETE FROM flights WHERE id = ? -- Cascade deletes via FK constraints: -- waypoints, geofences, flight_state, frame_results, -- heading_history, flight_images ``` **Test Cases**: 1. **Delete flight**: Cascades to all related tables 2. **Non-existent flight**: Returns False --- ## Waypoint Operations ### `get_waypoints(flight_id: str, limit: Optional[int] = None) -> List[Waypoint]` **Description**: Retrieves waypoints for a flight. **Called By**: - F02 Flight Processor **Input**: ```python flight_id: str limit: Optional[int] ``` **Output**: ```python List[Waypoint] ``` **Test Cases**: 1. **All waypoints**: limit=None → returns all 2. **Limited**: limit=100 → returns first 100 --- ### `insert_waypoint(flight_id: str, waypoint: Waypoint) -> str` **Description**: Inserts a new waypoint. **Called By**: - F02 Flight Processor **Input**: ```python flight_id: str waypoint: Waypoint ``` **Output**: ```python waypoint_id: str ``` **Test Cases**: 1. **Valid insertion**: Returns waypoint_id 2. **Non-existent flight**: Raises ForeignKeyError --- ### `update_waypoint(flight_id: str, waypoint_id: str, waypoint: Waypoint) -> bool` **Description**: Updates a waypoint. Critical path for GPS refinement updates. **Called By**: - F02 Flight Processor - F14 Result Manager **Input**: ```python flight_id: str waypoint_id: str waypoint: Waypoint ``` **Output**: ```python bool: True if updated, False if not found ``` **Database Operations**: ```sql UPDATE waypoints SET lat = ?, lon = ?, altitude = ?, confidence = ?, refined = ? WHERE id = ? AND flight_id = ? ``` **Optimization**: - Prepared statement caching - Connection pooling - Indexed on (flight_id, id) **Test Cases**: 1. **Update existing**: Returns True 2. **Non-existent**: Returns False 3. **High-frequency**: 100 updates/sec sustained --- ### `batch_update_waypoints(flight_id: str, waypoints: List[Waypoint]) -> BatchResult` **Description**: Updates multiple waypoints in a single transaction. **Called By**: - F02 Flight Processor (asynchronous refinements) **Input**: ```python flight_id: str waypoints: List[Waypoint] ``` **Output**: ```python BatchResult: success: bool updated_count: int failed_ids: List[str] ``` **Test Cases**: 1. **Batch update 100**: All succeed 2. **Partial failure**: Returns failed_ids --- ## Flight State Operations ### `save_flight_state(flight_state: FlightState) -> bool` **Description**: Saves or updates flight processing state. **Called By**: - F02 Flight Processor **Input**: ```python FlightState: flight_id: str status: str frames_processed: int frames_total: int current_frame: Optional[int] current_heading: Optional[float] blocked: bool search_grid_size: Optional[int] created_at: datetime updated_at: datetime ``` **Output**: ```python bool: True if saved ``` **Test Cases**: 1. Save state → persisted 2. Update state → overwrites --- ### `load_flight_state(flight_id: str) -> Optional[FlightState]` **Description**: Loads flight state (for crash recovery). **Called By**: - F02 Flight Processor **Output**: ```python Optional[FlightState] ``` **Test Cases**: 1. Load existing → returns state 2. Load non-existent → returns None --- ### `query_processing_history(filters: Dict[str, Any]) -> List[FlightState]` **Description**: Queries historical processing data. **Called By**: - Analytics, admin tools **Test Cases**: 1. Query by date range → returns flights 2. Query by status → returns filtered --- ## Frame Result Operations ### `save_frame_result(flight_id: str, frame_result: FrameResult) -> bool` **Description**: Saves frame processing result. **Called By**: - F14 Result Manager **Input**: ```python FrameResult: frame_id: int gps_center: GPSPoint altitude: float heading: float confidence: float refined: bool timestamp: datetime ``` **Output**: ```python bool: True if saved ``` **Test Cases**: 1. Save result → persisted 2. Update on refinement → overwrites --- ### `get_frame_results(flight_id: str) -> List[FrameResult]` **Description**: Gets all frame results for flight. **Called By**: - F14 Result Manager **Test Cases**: 1. Get results → returns all frames 2. No results → returns empty list --- ## Heading History Operations ### `save_heading(flight_id: str, frame_id: int, heading: float, timestamp: datetime) -> bool` **Description**: Saves heading value for temporal smoothing and recovery. **Called By**: - F06 Image Rotation Manager **Input**: ```python flight_id: str frame_id: int heading: float # Degrees 0-360 timestamp: datetime ``` **Output**: ```python bool: True if saved ``` **Test Cases**: 1. **Save heading**: Persisted correctly 2. **Overwrite heading**: Same frame_id → updates value --- ### `get_heading_history(flight_id: str, last_n: Optional[int] = None) -> List[HeadingRecord]` **Description**: Retrieves heading history for smoothing calculations. **Called By**: - F06 Image Rotation Manager **Input**: ```python flight_id: str last_n: Optional[int] # Get last N headings, or all if None ``` **Output**: ```python List[HeadingRecord]: - frame_id: int - heading: float - timestamp: datetime ``` **Test Cases**: 1. **Get all**: Returns complete history 2. **Get last 10**: Returns 10 most recent --- ### `get_latest_heading(flight_id: str) -> Optional[float]` **Description**: Gets most recent heading for pre-rotation. **Called By**: - F06 Image Rotation Manager **Output**: ```python Optional[float]: Heading in degrees, or None if no history ``` **Test Cases**: 1. **Has history**: Returns latest heading 2. **No history**: Returns None --- ## Image Storage Operations ### `save_image_metadata(flight_id: str, frame_id: int, file_path: str, metadata: Dict) -> bool` **Description**: Saves image file path and metadata (original filename, dimensions, etc.). **Called By**: - F05 Image Input Pipeline **Input**: ```python flight_id: str frame_id: int file_path: str # Path where image is stored metadata: Dict # {original_name, width, height, file_size, upload_time, ...} ``` **Output**: ```python bool: True if saved ``` **Test Cases**: 1. **Save metadata**: Persisted with file_path 2. **Overwrite**: Same frame_id → updates --- ### `get_image_path(flight_id: str, frame_id: int) -> Optional[str]` **Description**: Gets stored image file path. **Called By**: - F05 Image Input Pipeline **Output**: ```python Optional[str]: File path or None ``` **Test Cases**: 1. **Exists**: Returns file path 2. **Not exists**: Returns None --- ### `get_image_metadata(flight_id: str, frame_id: int) -> Optional[Dict]` **Description**: Gets image metadata. **Called By**: - F05 Image Input Pipeline **Output**: ```python Optional[Dict]: Metadata dictionary or None ``` --- ## Chunk State Operations ### `save_chunk_state(flight_id: str, chunk: ChunkHandle) -> bool` **Description**: Saves chunk state to database for crash recovery. **Called By**: - F12 Route Chunk Manager (after chunk state changes) **Input**: ```python flight_id: str chunk: ChunkHandle: chunk_id: str start_frame_id: int end_frame_id: Optional[int] frames: List[int] is_active: bool has_anchor: bool anchor_frame_id: Optional[int] anchor_gps: Optional[GPSPoint] matching_status: str ``` **Output**: ```python bool: True if saved successfully ``` **Database Operations**: ```sql INSERT INTO chunks (chunk_id, flight_id, start_frame_id, end_frame_id, frames, is_active, has_anchor, anchor_frame_id, anchor_lat, anchor_lon, matching_status, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW()) ON CONFLICT (chunk_id) UPDATE SET ... ``` **Test Cases**: 1. **Save new chunk**: Persisted successfully 2. **Update existing chunk**: State updated 3. **Multiple chunks**: All persisted correctly --- ### `load_chunk_states(flight_id: str) -> List[ChunkHandle]` **Description**: Loads all chunk states for a flight (for crash recovery). **Called By**: - F12 Route Chunk Manager (on flight resume) - System startup (recovery) **Input**: ```python flight_id: str ``` **Output**: ```python List[ChunkHandle]: All chunks for the flight ``` **Test Cases**: 1. **Load chunks**: Returns all chunks 2. **No chunks**: Returns empty list 3. **Crash recovery**: Chunks restored correctly --- ### `delete_chunk_state(flight_id: str, chunk_id: str) -> bool` **Description**: Deletes chunk state from database. **Called By**: - F12 Route Chunk Manager (after chunk merged/deleted) **Input**: ```python flight_id: str chunk_id: str ``` **Output**: ```python bool: True if deleted ``` **Test Cases**: 1. **Delete chunk**: Removed from database 2. **Non-existent chunk**: Returns False --- ## Integration Tests ### Test 1: Complete Flight Lifecycle 1. insert_flight() with 500 waypoints 2. save_flight_state() with initial state 3. update_waypoint() × 100 4. save_frame_result() × 500 5. save_heading() × 500 6. get_flight_by_id() and verify all data 7. delete_flight() and verify cascade ### Test 2: High-Frequency Update Pattern 1. insert_flight() with 2000 waypoints 2. Concurrent: update_waypoint(), save_frame_result(), save_heading() 3. Measure throughput > 200 updates/sec 4. Verify all data persisted ### Test 3: Crash Recovery 1. Insert flight, process 500 frames 2. Simulate crash (kill process) 3. Restart, load_flight_state() 4. Verify state intact, resume processing --- ## Non-Functional Requirements ### Performance - **insert_flight**: < 200ms for 100 waypoints - **update_waypoint**: < 30ms (critical path) - **get_flight_by_id**: < 100ms for 2000 waypoints - **save_heading**: < 10ms - **Throughput**: 200+ operations per second ### Scalability - Connection pool: 50-100 connections - Support 100+ concurrent flights - Handle tables with millions of records ### Reliability - ACID transaction guarantees - Automatic retry on transient errors (3 attempts) - Connection health checks ### Security - SQL injection prevention (parameterized queries) - Least privilege database permissions - Connection string encryption --- ## Dependencies ### Internal Components - None (lowest layer) ### External Dependencies - **PostgreSQL** or **MySQL** - **SQLAlchemy** or **psycopg2** - **Alembic**: Schema migrations --- ## Database Schema ```sql -- Flights table CREATE TABLE flights ( id VARCHAR(36) PRIMARY KEY, name VARCHAR(255) NOT NULL, description TEXT, start_lat DECIMAL(10, 7) NOT NULL, start_lon DECIMAL(11, 7) NOT NULL, altitude DECIMAL(7, 2) NOT NULL, camera_params JSONB NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, INDEX idx_created_at (created_at), INDEX idx_name (name) ); -- Waypoints table CREATE TABLE waypoints ( id VARCHAR(36) PRIMARY KEY, flight_id VARCHAR(36) NOT NULL, lat DECIMAL(10, 7) NOT NULL, lon DECIMAL(11, 7) NOT NULL, altitude DECIMAL(7, 2), confidence DECIMAL(3, 2) NOT NULL, timestamp TIMESTAMP NOT NULL, refined BOOLEAN NOT NULL DEFAULT FALSE, FOREIGN KEY (flight_id) REFERENCES flights(id) ON DELETE CASCADE, INDEX idx_flight_timestamp (flight_id, timestamp), INDEX idx_flight_id (flight_id, id) ); -- Geofences table CREATE TABLE geofences ( id VARCHAR(36) PRIMARY KEY, flight_id VARCHAR(36) NOT NULL, nw_lat DECIMAL(10, 7) NOT NULL, nw_lon DECIMAL(11, 7) NOT NULL, se_lat DECIMAL(10, 7) NOT NULL, se_lon DECIMAL(11, 7) NOT NULL, FOREIGN KEY (flight_id) REFERENCES flights(id) ON DELETE CASCADE, INDEX idx_geofence_flight (flight_id) ); -- Flight state table CREATE TABLE flight_state ( flight_id VARCHAR(36) PRIMARY KEY, status VARCHAR(50) NOT NULL, frames_processed INT NOT NULL DEFAULT 0, frames_total INT NOT NULL DEFAULT 0, current_frame INT, current_heading FLOAT, blocked BOOLEAN NOT NULL DEFAULT FALSE, search_grid_size INT, created_at TIMESTAMP NOT NULL, updated_at TIMESTAMP NOT NULL, FOREIGN KEY (flight_id) REFERENCES flights(id) ON DELETE CASCADE ); -- Frame results table CREATE TABLE frame_results ( id VARCHAR(36) PRIMARY KEY, flight_id VARCHAR(36) NOT NULL, frame_id INT NOT NULL, gps_lat DECIMAL(10, 7), gps_lon DECIMAL(11, 7), altitude FLOAT, heading FLOAT, confidence FLOAT, refined BOOLEAN DEFAULT FALSE, timestamp TIMESTAMP, updated_at TIMESTAMP, FOREIGN KEY (flight_id) REFERENCES flights(id) ON DELETE CASCADE, UNIQUE KEY (flight_id, frame_id), INDEX idx_frame_flight (flight_id, frame_id) ); -- Heading history table CREATE TABLE heading_history ( flight_id VARCHAR(36) NOT NULL, frame_id INT NOT NULL, heading FLOAT NOT NULL, timestamp TIMESTAMP NOT NULL, PRIMARY KEY (flight_id, frame_id), FOREIGN KEY (flight_id) REFERENCES flights(id) ON DELETE CASCADE, INDEX idx_heading_flight (flight_id, frame_id DESC) ); -- Flight images table CREATE TABLE flight_images ( flight_id VARCHAR(36) NOT NULL, frame_id INT NOT NULL, file_path VARCHAR(500) NOT NULL, metadata JSONB, uploaded_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (flight_id, frame_id), FOREIGN KEY (flight_id) REFERENCES flights(id) ON DELETE CASCADE, INDEX idx_images_flight (flight_id, frame_id) ); -- Chunks table CREATE TABLE chunks ( chunk_id VARCHAR(36) PRIMARY KEY, flight_id VARCHAR(36) NOT NULL, start_frame_id INT NOT NULL, end_frame_id INT, frames JSONB NOT NULL, is_active BOOLEAN NOT NULL DEFAULT TRUE, has_anchor BOOLEAN NOT NULL DEFAULT FALSE, anchor_frame_id INT, anchor_lat DECIMAL(10, 7), anchor_lon DECIMAL(11, 7), matching_status VARCHAR(50) NOT NULL DEFAULT 'unanchored', created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (flight_id) REFERENCES flights(id) ON DELETE CASCADE, INDEX idx_chunks_flight (flight_id), INDEX idx_chunks_active (flight_id, is_active), INDEX idx_chunks_matching (flight_id, matching_status) ); ``` --- ## Data Models ### Flight ```python class Flight(BaseModel): id: str name: str description: str start_gps: GPSPoint waypoints: List[Waypoint] geofences: Geofences camera_params: CameraParameters altitude: float created_at: datetime updated_at: datetime ``` ### FlightState ```python class FlightState(BaseModel): flight_id: str status: str frames_processed: int frames_total: int current_frame: Optional[int] current_heading: Optional[float] blocked: bool search_grid_size: Optional[int] created_at: datetime updated_at: datetime ``` ### FrameResult ```python class FrameResult(BaseModel): frame_id: int gps_center: GPSPoint altitude: float heading: float confidence: float refined: bool timestamp: datetime updated_at: datetime ``` ### HeadingRecord ```python class HeadingRecord(BaseModel): frame_id: int heading: float timestamp: datetime ``` ### BatchResult ```python class BatchResult(BaseModel): success: bool updated_count: int failed_ids: List[str] ``` ### DatabaseConfig ```python class DatabaseConfig(BaseModel): host: str port: int database: str username: str password: str pool_size: int = 50 max_overflow: int = 50 pool_timeout: int = 30 pool_recycle: int = 3600 ```