24 KiB
Flight Database
Interface Definition
Interface Name: IFlightDatabase
Interface Methods
class IFlightDatabase(ABC):
# Transaction Support
@abstractmethod
def execute_transaction(self, operations: List[Callable]) -> bool:
"""Executes a list of DB operations atomically."""
pass
# Flight Operations
@abstractmethod
def insert_flight(self, flight: Flight) -> str:
pass
@abstractmethod
def update_flight(self, flight: Flight) -> bool:
pass
@abstractmethod
def query_flights(self, filters: Dict[str, Any], limit: int, offset: int) -> List[Flight]:
pass
@abstractmethod
def get_flight_by_id(self, flight_id: str) -> Optional[Flight]:
pass
@abstractmethod
def delete_flight(self, flight_id: str) -> bool:
pass
# Waypoint Operations
@abstractmethod
def get_waypoints(self, flight_id: str, limit: Optional[int] = None) -> List[Waypoint]:
pass
@abstractmethod
def insert_waypoint(self, flight_id: str, waypoint: Waypoint) -> str:
pass
@abstractmethod
def update_waypoint(self, flight_id: str, waypoint_id: str, waypoint: Waypoint) -> bool:
pass
@abstractmethod
def batch_update_waypoints(self, flight_id: str, waypoints: List[Waypoint]) -> BatchResult:
pass
# Flight State Operations
@abstractmethod
def save_flight_state(self, flight_state: FlightState) -> bool:
pass
@abstractmethod
def load_flight_state(self, flight_id: str) -> Optional[FlightState]:
pass
@abstractmethod
def query_processing_history(self, filters: Dict[str, Any]) -> List[FlightState]:
pass
# Frame Result Operations
@abstractmethod
def save_frame_result(self, flight_id: str, frame_result: FrameResult) -> bool:
pass
@abstractmethod
def get_frame_results(self, flight_id: str) -> List[FrameResult]:
pass
# Heading History Operations
@abstractmethod
def save_heading(self, flight_id: str, frame_id: int, heading: float, timestamp: datetime) -> bool:
pass
@abstractmethod
def get_heading_history(self, flight_id: str, last_n: Optional[int] = None) -> List[HeadingRecord]:
pass
@abstractmethod
def get_latest_heading(self, flight_id: str) -> Optional[float]:
pass
# Image Storage Operations
@abstractmethod
def save_image_metadata(self, flight_id: str, frame_id: int, file_path: str, metadata: Dict) -> bool:
pass
@abstractmethod
def get_image_path(self, flight_id: str, frame_id: int) -> Optional[str]:
pass
@abstractmethod
def get_image_metadata(self, flight_id: str, frame_id: int) -> Optional[Dict]:
pass
# Chunk State Operations
@abstractmethod
def save_chunk_state(self, flight_id: str, chunk: ChunkHandle) -> bool:
pass
@abstractmethod
def load_chunk_states(self, flight_id: str) -> List[ChunkHandle]:
pass
@abstractmethod
def delete_chunk_state(self, flight_id: str, chunk_id: str) -> bool:
pass
Component Description
Responsibilities
- Persistence layer.
- Consistency: Implements explicit transaction support to ensure
waypointsandframe_results(and other related tables) stay synchronized when updated by F14 Result Manager. - Direct database access layer for all flight-related data
- Execute SQL queries and commands
- Manage database connections and transactions
- Handle connection pooling and retry logic
- Provide database abstraction (PostgreSQL, MySQL, etc.)
- Persist flight state, waypoints, frame results
- Store heading history for rotation management
- Store image file paths and metadata
Transactional Integrity
- Atomic Updates: Critical for preventing partial state updates during chunk merges or frame refinements.
- Pattern: Usage of database transactions (BEGIN, COMMIT, ROLLBACK) for batch operations.
Scope
- CRUD operations on flights table
- CRUD operations on waypoints table
- CRUD operations on geofences table
- Flight state persistence
- Frame result storage
- Heading history tracking
- Image metadata storage
- Query optimization for large datasets
Design Decision: Denormalization
The schema uses strategic denormalization to optimize for the most common access patterns:
Denormalized Fields:
frame_resultsstoresgps_lat,gps_londirectly (not as foreign key to waypoints)flight_stateduplicatesframes_processed(could be computed from frame_results)chunks.framesstored as JSONB array (not normalized into separate frame_chunk mapping table)
Rationale:
- Read-heavy workload: Frame results are read 100x more than written
- Avoids JOINs in critical path (per-frame processing)
- Simplifies chunk lifecycle (all chunk data in single row)
- Acceptable trade-off: Slightly increased storage, significantly faster reads
Flight Operations
insert_flight(flight: Flight) -> str
Description: Inserts a new flight with initial waypoints and geofences.
Called By:
- F02.1 Flight Lifecycle Manager
Input:
Flight:
id: str
name: str
description: str
start_gps: GPSPoint
rough_waypoints: List[Waypoint]
geofences: Geofences
camera_params: CameraParameters
altitude: float
created_at: datetime
updated_at: datetime
Output:
flight_id: str
Database Operations:
- Begin transaction
- INSERT INTO flights
- INSERT INTO waypoints for each initial waypoint
- INSERT INTO geofences for each polygon
- INSERT INTO flight_state (initial state)
- Commit transaction
Error Conditions:
IntegrityError: Duplicate flight_idDatabaseError: Connection error, transaction failure- Automatic rollback on error
Test Cases:
- Insert flight with 100 waypoints: All data persisted
- Duplicate flight_id: Raises IntegrityError
- Transaction rollback: Error mid-insert → complete rollback
update_flight(flight: Flight) -> bool
Description: Updates flight metadata.
Called By:
- F02.1 Flight Lifecycle Manager
Input:
Flight with updated fields
Output:
bool: True if updated, False if not found
Database Operations:
UPDATE flights
SET name = ?, description = ?, updated_at = ?
WHERE id = ?
Test Cases:
- Update existing flight: Returns True
- Update non-existent flight: Returns False
query_flights(filters: Dict[str, Any], limit: int, offset: int) -> List[Flight]
Description: Queries flights with filtering and pagination.
Called By:
- F02.1 Flight Lifecycle Manager (listing)
- F01 Flight API
Input:
filters: Dict[str, Any] # e.g., {"name": "Mission%", "status": "completed"}
limit: int
offset: int
Output:
List[Flight] # Metadata only, without full waypoint data
Test Cases:
- Filter by name: Returns matching flights
- Pagination: offset=100, limit=50 → returns flights 100-149
- No matches: Returns []
get_flight_by_id(flight_id: str) -> Optional[Flight]
Description: Retrieves complete flight with all waypoints.
Called By:
- F02.1 Flight Lifecycle Manager
Input:
flight_id: str
Output:
Optional[Flight] # Complete flight with all waypoints
Database Operations:
- SELECT FROM flights WHERE id = ?
- SELECT FROM waypoints WHERE flight_id = ? ORDER BY timestamp
- SELECT FROM geofences WHERE flight_id = ?
- Assemble Flight object
Test Cases:
- Existing flight: Returns complete Flight
- Non-existent flight: Returns None
- Large flight (3000 waypoints): Returns within 150ms
delete_flight(flight_id: str) -> bool
Description: Deletes a flight and cascades to all related data.
Called By:
- F02.1 Flight Lifecycle Manager
Input:
flight_id: str
Output:
bool: True if deleted, False if not found
Database Operations:
DELETE FROM flights WHERE id = ?
-- Cascade deletes via FK constraints:
-- waypoints, geofences, flight_state, frame_results,
-- heading_history, flight_images
Test Cases:
- Delete flight: Cascades to all related tables
- Non-existent flight: Returns False
Waypoint Operations
get_waypoints(flight_id: str, limit: Optional[int] = None) -> List[Waypoint]
Description: Retrieves waypoints for a flight.
Called By:
- F02.1 Flight Lifecycle Manager
Input:
flight_id: str
limit: Optional[int]
Output:
List[Waypoint]
Test Cases:
- All waypoints: limit=None → returns all
- Limited: limit=100 → returns first 100
insert_waypoint(flight_id: str, waypoint: Waypoint) -> str
Description: Inserts a new waypoint.
Called By:
- F02.1 Flight Lifecycle Manager
Input:
flight_id: str
waypoint: Waypoint
Output:
waypoint_id: str
Test Cases:
- Valid insertion: Returns waypoint_id
- Non-existent flight: Raises ForeignKeyError
update_waypoint(flight_id: str, waypoint_id: str, waypoint: Waypoint) -> bool
Description: Updates a waypoint. Critical path for GPS refinement updates.
Called By:
- F02.1 Flight Lifecycle Manager
- F14 Result Manager
Input:
flight_id: str
waypoint_id: str
waypoint: Waypoint
Output:
bool: True if updated, False if not found
Database Operations:
UPDATE waypoints
SET lat = ?, lon = ?, altitude = ?, confidence = ?, refined = ?
WHERE id = ? AND flight_id = ?
Optimization:
- Prepared statement caching
- Connection pooling
- Indexed on (flight_id, id)
Test Cases:
- Update existing: Returns True
- Non-existent: Returns False
- High-frequency: 100 updates/sec sustained
batch_update_waypoints(flight_id: str, waypoints: List[Waypoint]) -> BatchResult
Description: Updates multiple waypoints in a single transaction.
Called By:
- F02.2 Flight Processing Engine (asynchronous refinements)
Input:
flight_id: str
waypoints: List[Waypoint]
Output:
BatchResult:
success: bool
updated_count: int
failed_ids: List[str]
Test Cases:
- Batch update 100: All succeed
- Partial failure: Returns failed_ids
Flight State Operations
save_flight_state(flight_state: FlightState) -> bool
Description: Saves or updates flight processing state.
Called By:
- F02.2 Flight Processing Engine
Input:
FlightState:
flight_id: str
status: str
frames_processed: int
frames_total: int
current_frame: Optional[int]
blocked: bool
search_grid_size: Optional[int]
created_at: datetime
updated_at: datetime
Note: Heading is NOT stored in flight_state. Use the dedicated heading_history table via save_heading() and get_latest_heading() for all heading operations. This avoids data duplication and ensures a single source of truth for heading data.
Output:
bool: True if saved
Test Cases:
- Save state → persisted
- Update state → overwrites
load_flight_state(flight_id: str) -> Optional[FlightState]
Description: Loads flight state (for crash recovery).
Called By:
- F02.2 Flight Processing Engine
Output:
Optional[FlightState]
Test Cases:
- Load existing → returns state
- Load non-existent → returns None
query_processing_history(filters: Dict[str, Any]) -> List[FlightState]
Description: Queries historical processing data.
Called By:
- Analytics, admin tools
Test Cases:
- Query by date range → returns flights
- Query by status → returns filtered
Frame Result Operations
save_frame_result(flight_id: str, frame_result: FrameResult) -> bool
Description: Saves frame processing result.
Called By:
- F14 Result Manager
Input:
FrameResult:
frame_id: int
gps_center: GPSPoint
altitude: float
heading: float
confidence: float
refined: bool
timestamp: datetime
Output:
bool: True if saved
Test Cases:
- Save result → persisted
- Update on refinement → overwrites
get_frame_results(flight_id: str) -> List[FrameResult]
Description: Gets all frame results for flight.
Called By:
- F14 Result Manager
Test Cases:
- Get results → returns all frames
- No results → returns empty list
Heading History Operations
save_heading(flight_id: str, frame_id: int, heading: float, timestamp: datetime) -> bool
Description: Saves heading value for temporal smoothing and recovery.
Called By:
- F02.2 Flight Processing Engine (after F06.update_heading() returns, F02 persists to F03)
Input:
flight_id: str
frame_id: int
heading: float # Degrees 0-360
timestamp: datetime
Output:
bool: True if saved
Test Cases:
- Save heading: Persisted correctly
- Overwrite heading: Same frame_id → updates value
get_heading_history(flight_id: str, last_n: Optional[int] = None) -> List[HeadingRecord]
Description: Retrieves heading history for smoothing calculations.
Called By:
- F06 Image Rotation Manager
Input:
flight_id: str
last_n: Optional[int] # Get last N headings, or all if None
Output:
List[HeadingRecord]:
- frame_id: int
- heading: float
- timestamp: datetime
Test Cases:
- Get all: Returns complete history
- Get last 10: Returns 10 most recent
get_latest_heading(flight_id: str) -> Optional[float]
Description: Gets most recent heading for pre-rotation.
Called By:
- F06 Image Rotation Manager
Output:
Optional[float]: Heading in degrees, or None if no history
Test Cases:
- Has history: Returns latest heading
- No history: Returns None
Image Storage Operations
save_image_metadata(flight_id: str, frame_id: int, file_path: str, metadata: Dict) -> bool
Description: Saves image file path and metadata (original filename, dimensions, etc.).
Called By:
- F05 Image Input Pipeline
Input:
flight_id: str
frame_id: int
file_path: str # Path where image is stored
metadata: Dict # {original_name, width, height, file_size, upload_time, ...}
Output:
bool: True if saved
Test Cases:
- Save metadata: Persisted with file_path
- Overwrite: Same frame_id → updates
get_image_path(flight_id: str, frame_id: int) -> Optional[str]
Description: Gets stored image file path.
Called By:
- F05 Image Input Pipeline
Output:
Optional[str]: File path or None
Test Cases:
- Exists: Returns file path
- Not exists: Returns None
get_image_metadata(flight_id: str, frame_id: int) -> Optional[Dict]
Description: Gets image metadata.
Called By:
- F05 Image Input Pipeline
Output:
Optional[Dict]: Metadata dictionary or None
Chunk State Operations
Necessity: These methods are required for crash recovery. Without them:
- Chunk state would be lost on system restart
- Processing would need to start from scratch
- Background chunk matching progress would be lost
F12 Route Chunk Manager delegates persistence to F03 to maintain separation of concerns (F12 manages chunk logic, F03 handles storage).
save_chunk_state(flight_id: str, chunk: ChunkHandle) -> bool
Description: Saves chunk state to database for crash recovery.
Called By:
- F12 Route Chunk Manager (after chunk state changes)
Input:
flight_id: str
chunk: ChunkHandle:
chunk_id: str
start_frame_id: int
end_frame_id: Optional[int]
frames: List[int]
is_active: bool
has_anchor: bool
anchor_frame_id: Optional[int]
anchor_gps: Optional[GPSPoint]
matching_status: str
Output:
bool: True if saved successfully
Database Operations:
INSERT INTO chunks (chunk_id, flight_id, start_frame_id, end_frame_id, frames,
is_active, has_anchor, anchor_frame_id, anchor_lat, anchor_lon,
matching_status, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW())
ON CONFLICT (chunk_id) UPDATE SET ...
Test Cases:
- Save new chunk: Persisted successfully
- Update existing chunk: State updated
- Multiple chunks: All persisted correctly
load_chunk_states(flight_id: str) -> List[ChunkHandle]
Description: Loads all chunk states for a flight (for crash recovery).
Called By:
- F12 Route Chunk Manager (on flight resume)
- System startup (recovery)
Input:
flight_id: str
Output:
List[ChunkHandle]: All chunks for the flight
Test Cases:
- Load chunks: Returns all chunks
- No chunks: Returns empty list
- Crash recovery: Chunks restored correctly
delete_chunk_state(flight_id: str, chunk_id: str) -> bool
Description: Deletes chunk state from database.
Called By:
- F12 Route Chunk Manager (after chunk merged/deleted)
Input:
flight_id: str
chunk_id: str
Output:
bool: True if deleted
Test Cases:
- Delete chunk: Removed from database
- Non-existent chunk: Returns False
Integration Tests
Test 1: Complete Flight Lifecycle
- insert_flight() with 500 waypoints
- save_flight_state() with initial state
- update_waypoint() × 100
- save_frame_result() × 500
- save_heading() × 500
- get_flight_by_id() and verify all data
- delete_flight() and verify cascade
Test 2: High-Frequency Update Pattern
- insert_flight() with 2000 waypoints
- Concurrent: update_waypoint(), save_frame_result(), save_heading()
- Measure throughput > 200 updates/sec
- Verify all data persisted
Test 3: Crash Recovery
- Insert flight, process 500 frames
- Simulate crash (kill process)
- Restart, load_flight_state()
- Verify state intact, resume processing
Non-Functional Requirements
Performance
- insert_flight: < 200ms for 100 waypoints
- update_waypoint: < 30ms (critical path)
- get_flight_by_id: < 100ms for 2000 waypoints
- save_heading: < 10ms
- Throughput: 200+ operations per second
Scalability
- Connection pool: 50-100 connections
- Support 100+ concurrent flights
- Handle tables with millions of records
Reliability
- ACID transaction guarantees
- Automatic retry on transient errors (3 attempts)
- Connection health checks
Security
- SQL injection prevention (parameterized queries)
- Least privilege database permissions
- Connection string encryption
Dependencies
Internal Components
- None (lowest layer)
External Dependencies
- PostgreSQL or MySQL
- SQLAlchemy or psycopg2
- Alembic: Schema migrations
Database Schema
-- Flights table
CREATE TABLE flights (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
start_lat DECIMAL(10, 7) NOT NULL,
start_lon DECIMAL(11, 7) NOT NULL,
altitude DECIMAL(7, 2) NOT NULL,
camera_params JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_created_at (created_at),
INDEX idx_name (name)
);
-- Waypoints table
CREATE TABLE waypoints (
id VARCHAR(36) PRIMARY KEY,
flight_id VARCHAR(36) NOT NULL,
lat DECIMAL(10, 7) NOT NULL,
lon DECIMAL(11, 7) NOT NULL,
altitude DECIMAL(7, 2),
confidence DECIMAL(3, 2) NOT NULL,
timestamp TIMESTAMP NOT NULL,
refined BOOLEAN NOT NULL DEFAULT FALSE,
FOREIGN KEY (flight_id) REFERENCES flights(id) ON DELETE CASCADE,
INDEX idx_flight_timestamp (flight_id, timestamp),
INDEX idx_flight_id (flight_id, id)
);
-- Geofences table
CREATE TABLE geofences (
id VARCHAR(36) PRIMARY KEY,
flight_id VARCHAR(36) NOT NULL,
nw_lat DECIMAL(10, 7) NOT NULL,
nw_lon DECIMAL(11, 7) NOT NULL,
se_lat DECIMAL(10, 7) NOT NULL,
se_lon DECIMAL(11, 7) NOT NULL,
FOREIGN KEY (flight_id) REFERENCES flights(id) ON DELETE CASCADE,
INDEX idx_geofence_flight (flight_id)
);
-- Flight state table
-- NOTE: Heading is NOT stored here. Use heading_history table for heading data.
-- This avoids duplication and ensures single source of truth.
CREATE TABLE flight_state (
flight_id VARCHAR(36) PRIMARY KEY,
status VARCHAR(50) NOT NULL,
frames_processed INT NOT NULL DEFAULT 0,
frames_total INT NOT NULL DEFAULT 0,
current_frame INT,
blocked BOOLEAN NOT NULL DEFAULT FALSE,
search_grid_size INT,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
FOREIGN KEY (flight_id) REFERENCES flights(id) ON DELETE CASCADE
);
-- Frame results table
CREATE TABLE frame_results (
id VARCHAR(36) PRIMARY KEY,
flight_id VARCHAR(36) NOT NULL,
frame_id INT NOT NULL,
gps_lat DECIMAL(10, 7),
gps_lon DECIMAL(11, 7),
altitude FLOAT,
heading FLOAT,
confidence FLOAT,
refined BOOLEAN DEFAULT FALSE,
timestamp TIMESTAMP,
updated_at TIMESTAMP,
FOREIGN KEY (flight_id) REFERENCES flights(id) ON DELETE CASCADE,
UNIQUE KEY (flight_id, frame_id),
INDEX idx_frame_flight (flight_id, frame_id)
);
-- Heading history table
CREATE TABLE heading_history (
flight_id VARCHAR(36) NOT NULL,
frame_id INT NOT NULL,
heading FLOAT NOT NULL,
timestamp TIMESTAMP NOT NULL,
PRIMARY KEY (flight_id, frame_id),
FOREIGN KEY (flight_id) REFERENCES flights(id) ON DELETE CASCADE,
INDEX idx_heading_flight (flight_id, frame_id DESC)
);
-- Flight images table
CREATE TABLE flight_images (
flight_id VARCHAR(36) NOT NULL,
frame_id INT NOT NULL,
file_path VARCHAR(500) NOT NULL,
metadata JSONB,
uploaded_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (flight_id, frame_id),
FOREIGN KEY (flight_id) REFERENCES flights(id) ON DELETE CASCADE,
INDEX idx_images_flight (flight_id, frame_id)
);
-- Chunks table
CREATE TABLE chunks (
chunk_id VARCHAR(36) PRIMARY KEY,
flight_id VARCHAR(36) NOT NULL,
start_frame_id INT NOT NULL,
end_frame_id INT,
frames JSONB NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
has_anchor BOOLEAN NOT NULL DEFAULT FALSE,
anchor_frame_id INT,
anchor_lat DECIMAL(10, 7),
anchor_lon DECIMAL(11, 7),
matching_status VARCHAR(50) NOT NULL DEFAULT 'unanchored',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (flight_id) REFERENCES flights(id) ON DELETE CASCADE,
-- Foreign key to ensure anchor_frame_id references valid frame in flight_images
CONSTRAINT fk_anchor_frame
FOREIGN KEY (flight_id, anchor_frame_id)
REFERENCES flight_images(flight_id, frame_id)
ON DELETE SET NULL,
INDEX idx_chunks_flight (flight_id),
INDEX idx_chunks_active (flight_id, is_active),
INDEX idx_chunks_matching (flight_id, matching_status)
);
Data Models
Flight
class Flight(BaseModel):
id: str
name: str
description: str
start_gps: GPSPoint
waypoints: List[Waypoint]
geofences: Geofences
camera_params: CameraParameters
altitude: float
created_at: datetime
updated_at: datetime
FlightState
class FlightState(BaseModel):
flight_id: str
status: str
frames_processed: int
frames_total: int
current_frame: Optional[int]
blocked: bool
search_grid_size: Optional[int]
created_at: datetime
updated_at: datetime
# NOTE: Heading is NOT stored in FlightState.
# Use get_latest_heading(flight_id) from heading_history table.
FrameResult
class FrameResult(BaseModel):
frame_id: int
gps_center: GPSPoint
altitude: float
heading: float
confidence: float
refined: bool
timestamp: datetime
updated_at: datetime
HeadingRecord
class HeadingRecord(BaseModel):
frame_id: int
heading: float
timestamp: datetime
BatchResult
class BatchResult(BaseModel):
success: bool
updated_count: int
failed_ids: List[str]
DatabaseConfig
class DatabaseConfig(BaseModel):
host: str
port: int
database: str
username: str
password: str
pool_size: int = 50
max_overflow: int = 50
pool_timeout: int = 30
pool_recycle: int = 3600