component decomposition is done

This commit is contained in:
Oleksandr Bezdieniezhnykh
2025-11-24 14:09:23 +02:00
parent acec83018b
commit f50006d100
34 changed files with 8637 additions and 0 deletions
@@ -0,0 +1,101 @@
# Camera Model Helper
## Interface Definition
**Interface Name**: `ICameraModel`
### Interface Methods
```python
class ICameraModel(ABC):
@abstractmethod
def project(self, point_3d: np.ndarray, camera_params: CameraParameters) -> Tuple[float, float]:
pass
@abstractmethod
def unproject(self, pixel: Tuple[float, float], depth: float, camera_params: CameraParameters) -> np.ndarray:
pass
@abstractmethod
def get_focal_length(self, camera_params: CameraParameters) -> Tuple[float, float]:
pass
@abstractmethod
def apply_distortion(self, pixel: Tuple[float, float], distortion_coeffs: List[float]) -> Tuple[float, float]:
pass
@abstractmethod
def remove_distortion(self, pixel: Tuple[float, float], distortion_coeffs: List[float]) -> Tuple[float, float]:
pass
```
## Component Description
Pinhole camera projection model with Brown-Conrady distortion handling.
## API Methods
### `project(point_3d: np.ndarray, camera_params: CameraParameters) -> Tuple[float, float]`
**Description**: Projects 3D point to 2D image pixel.
**Formula**:
```
x = fx * X/Z + cx
y = fy * Y/Z + cy
```
---
### `unproject(pixel: Tuple[float, float], depth: float, camera_params: CameraParameters) -> np.ndarray`
**Description**: Unprojects pixel to 3D ray at given depth.
**Formula**:
```
X = (x - cx) * depth / fx
Y = (y - cy) * depth / fy
Z = depth
```
---
### `get_focal_length(camera_params: CameraParameters) -> Tuple[float, float]`
**Description**: Returns (fx, fy) in pixels.
**Formula**:
```
fx = focal_length_mm * image_width / sensor_width_mm
fy = focal_length_mm * image_height / sensor_height_mm
```
---
### `apply_distortion(pixel: Tuple[float, float], distortion_coeffs: List[float]) -> Tuple[float, float]`
**Description**: Applies radial and tangential distortion (Brown-Conrady model).
---
### `remove_distortion(pixel: Tuple[float, float], distortion_coeffs: List[float]) -> Tuple[float, float]`
**Description**: Removes distortion from observed pixel.
## Dependencies
**External**: opencv-python, numpy
## Data Models
```python
class CameraParameters(BaseModel):
focal_length: float # mm
sensor_width: float # mm
sensor_height: float # mm
resolution_width: int
resolution_height: int
principal_point: Tuple[float, float] # (cx, cy) pixels
distortion_coefficients: List[float] # [k1, k2, p1, p2, k3]
```
@@ -0,0 +1,78 @@
# GSD Calculator Helper
## Interface Definition
**Interface Name**: `IGSDCalculator`
### Interface Methods
```python
class IGSDCalculator(ABC):
@abstractmethod
def compute_gsd(self, altitude: float, camera_params: CameraParameters) -> float:
pass
@abstractmethod
def altitude_to_scale(self, altitude: float, focal_length: float) -> float:
pass
@abstractmethod
def meters_per_pixel(self, lat: float, zoom: int) -> float:
pass
@abstractmethod
def gsd_from_camera(self, altitude: float, focal_length: float, sensor_width: float, image_width: int) -> float:
pass
```
## Component Description
Ground Sampling Distance computations for altitude and coordinate systems.
## API Methods
### `compute_gsd(altitude: float, camera_params: CameraParameters) -> float`
**Description**: Computes GSD from altitude and camera parameters.
**Formula**:
```
GSD = (altitude * sensor_width) / (focal_length * image_width)
```
**Example**: altitude=800m, focal=24mm, sensor=36mm, width=6000px → GSD=0.2 m/pixel
---
### `altitude_to_scale(altitude: float, focal_length: float) -> float`
**Description**: Converts altitude to scale factor for VO.
---
### `meters_per_pixel(lat: float, zoom: int) -> float`
**Description**: Computes GSD for Web Mercator tiles at zoom level.
**Formula**:
```
meters_per_pixel = 156543.03392 * cos(lat * π/180) / 2^zoom
```
**Example**: lat=48°N, zoom=19 → ~0.3 m/pixel
---
### `gsd_from_camera(altitude: float, focal_length: float, sensor_width: float, image_width: int) -> float`
**Description**: Direct GSD calculation from parameters.
## Dependencies
**External**: numpy
## Test Cases
1. Standard camera at 800m → GSD ~0.1-0.3 m/pixel
2. Web Mercator zoom 19 at Ukraine → ~0.3 m/pixel
@@ -0,0 +1,74 @@
# Robust Kernels Helper
## Interface Definition
**Interface Name**: `IRobustKernels`
### Interface Methods
```python
class IRobustKernels(ABC):
@abstractmethod
def huber_loss(self, error: float, threshold: float) -> float:
pass
@abstractmethod
def cauchy_loss(self, error: float, k: float) -> float:
pass
@abstractmethod
def compute_weight(self, error: float, kernel_type: str, params: Dict[str, float]) -> float:
pass
```
## Component Description
Huber/Cauchy loss functions for outlier rejection in optimization.
## API Methods
### `huber_loss(error: float, threshold: float) -> float`
**Description**: Huber robust loss function.
**Formula**:
```
if |error| <= threshold:
loss = 0.5 * error^2
else:
loss = threshold * (|error| - 0.5 * threshold)
```
**Purpose**: Quadratic for small errors, linear for large errors (outliers).
---
### `cauchy_loss(error: float, k: float) -> float`
**Description**: Cauchy robust loss function.
**Formula**:
```
loss = (k^2 / 2) * log(1 + (error/k)^2)
```
**Purpose**: More aggressive outlier rejection than Huber.
---
### `compute_weight(error: float, kernel_type: str, params: Dict[str, float]) -> float`
**Description**: Computes robust weight for error.
**Usage**: Factor Graph applies weights to downweight outliers.
## Dependencies
**External**: numpy
## Test Cases
1. Small error → weight ≈ 1.0
2. Large error (350m outlier) → weight ≈ 0.1 (downweighted)
3. Huber vs Cauchy → Cauchy more aggressive
@@ -0,0 +1,84 @@
# Faiss Index Manager Helper
## Interface Definition
**Interface Name**: `IFaissIndexManager`
### Interface Methods
```python
class IFaissIndexManager(ABC):
@abstractmethod
def build_index(self, descriptors: np.ndarray, index_type: str) -> FaissIndex:
pass
@abstractmethod
def add_descriptors(self, index: FaissIndex, descriptors: np.ndarray) -> bool:
pass
@abstractmethod
def search(self, index: FaissIndex, query: np.ndarray, k: int) -> Tuple[np.ndarray, np.ndarray]:
pass
@abstractmethod
def save_index(self, index: FaissIndex, path: str) -> bool:
pass
@abstractmethod
def load_index(self, path: str) -> FaissIndex:
pass
```
## Component Description
Manages Faiss indices for AnyLoc retrieval (IVF, HNSW options).
## API Methods
### `build_index(descriptors: np.ndarray, index_type: str) -> FaissIndex`
**Description**: Builds Faiss index from descriptors.
**Index Types**:
- **"IVF"**: Inverted File (fast for large databases)
- **"HNSW"**: Hierarchical Navigable Small World (best accuracy/speed trade-off)
- **"Flat"**: Brute force (exact, slow for large datasets)
**Input**: (N, D) descriptors array
---
### `add_descriptors(index: FaissIndex, descriptors: np.ndarray) -> bool`
**Description**: Adds more descriptors to existing index.
---
### `search(index: FaissIndex, query: np.ndarray, k: int) -> Tuple[np.ndarray, np.ndarray]`
**Description**: Searches for k nearest neighbors.
**Output**: (distances, indices) - shape (k,)
---
### `save_index(index: FaissIndex, path: str) -> bool`
**Description**: Saves index to disk for fast startup.
---
### `load_index(path: str) -> FaissIndex`
**Description**: Loads pre-built index from disk.
## Dependencies
**External**: faiss-gpu or faiss-cpu
## Test Cases
1. Build index with 10,000 descriptors → succeeds
2. Search query → returns top-k matches
3. Save/load index → index restored correctly
@@ -0,0 +1,93 @@
# Performance Monitor Helper
## Interface Definition
**Interface Name**: `IPerformanceMonitor`
### Interface Methods
```python
class IPerformanceMonitor(ABC):
@abstractmethod
def start_timer(self, operation: str) -> str:
pass
@abstractmethod
def end_timer(self, timer_id: str) -> float:
pass
@abstractmethod
def get_statistics(self, operation: str) -> PerformanceStats:
pass
@abstractmethod
def check_sla(self, operation: str, threshold: float) -> bool:
pass
@abstractmethod
def get_bottlenecks(self) -> List[Tuple[str, float]]:
pass
```
## Component Description
Tracks processing times, ensures <5s constraint per frame.
## API Methods
### `start_timer(operation: str) -> str`
**Description**: Starts timing an operation.
**Returns**: timer_id (UUID)
---
### `end_timer(timer_id: str) -> float`
**Description**: Ends timer and records duration.
**Returns**: Duration in seconds
---
### `get_statistics(operation: str) -> PerformanceStats`
**Description**: Gets statistics for an operation.
**Output**:
```python
PerformanceStats:
operation: str
count: int
mean: float
p50: float
p95: float
p99: float
max: float
```
---
### `check_sla(operation: str, threshold: float) -> bool`
**Description**: Checks if operation meets SLA threshold.
**Example**: check_sla("frame_processing", 5.0) → True if < 5s
---
### `get_bottlenecks() -> List[Tuple[str, float]]`
**Description**: Returns slowest operations.
## Dependencies
**External**: time, statistics
## Test Cases
1. Start/end timer → records duration
2. Get statistics → returns percentiles
3. Check SLA → returns True if meeting targets
@@ -0,0 +1,94 @@
# Web Mercator Utils Helper
## Interface Definition
**Interface Name**: `IWebMercatorUtils`
### Interface Methods
```python
class IWebMercatorUtils(ABC):
@abstractmethod
def latlon_to_tile(self, lat: float, lon: float, zoom: int) -> Tuple[int, int]:
pass
@abstractmethod
def tile_to_latlon(self, x: int, y: int, zoom: int) -> Tuple[float, float]:
pass
@abstractmethod
def compute_tile_bounds(self, x: int, y: int, zoom: int) -> TileBounds:
pass
@abstractmethod
def get_zoom_gsd(self, lat: float, zoom: int) -> float:
pass
```
## Component Description
Web Mercator projection (EPSG:3857) for tile coordinates. Used for Google Maps tiles.
## API Methods
### `latlon_to_tile(lat: float, lon: float, zoom: int) -> Tuple[int, int]`
**Description**: Converts GPS to tile coordinates.
**Formula**:
```
n = 2^zoom
x = floor((lon + 180) / 360 * n)
lat_rad = lat * π / 180
y = floor((1 - log(tan(lat_rad) + sec(lat_rad)) / π) / 2 * n)
```
**Returns**: (x, y) tile coordinates
---
### `tile_to_latlon(x: int, y: int, zoom: int) -> Tuple[float, float]`
**Description**: Converts tile coordinates to GPS (NW corner).
**Formula** (inverse of above)
---
### `compute_tile_bounds(x: int, y: int, zoom: int) -> TileBounds`
**Description**: Computes GPS bounding box of tile.
**Returns**:
```python
TileBounds:
nw: GPSPoint # North-West
ne: GPSPoint # North-East
sw: GPSPoint # South-West
se: GPSPoint # South-East
center: GPSPoint
gsd: float
```
---
### `get_zoom_gsd(lat: float, zoom: int) -> float`
**Description**: Gets GSD for zoom level at latitude.
**Formula**:
```
gsd = 156543.03392 * cos(lat * π/180) / 2^zoom
```
## Dependencies
**External**: numpy
## Test Cases
1. GPS to tile at zoom 19 → valid tile coords
2. Tile to GPS → inverse correct
3. Compute bounds → 4 corners valid
4. GSD at zoom 19, Ukraine → ~0.3 m/pixel
@@ -0,0 +1,92 @@
# Image Rotation Utils Helper
## Interface Definition
**Interface Name**: `IImageRotationUtils`
### Interface Methods
```python
class IImageRotationUtils(ABC):
@abstractmethod
def rotate_image(self, image: np.ndarray, angle: float, center: Optional[Tuple[int, int]] = None) -> np.ndarray:
pass
@abstractmethod
def calculate_rotation_from_points(self, src_points: np.ndarray, dst_points: np.ndarray) -> float:
pass
@abstractmethod
def normalize_angle(self, angle: float) -> float:
pass
@abstractmethod
def compute_rotation_matrix(self, angle: float, center: Tuple[int, int]) -> np.ndarray:
pass
```
## Component Description
Image rotation operations, angle calculations from point shifts.
## API Methods
### `rotate_image(image: np.ndarray, angle: float, center: Optional[Tuple[int, int]] = None) -> np.ndarray`
**Description**: Rotates image around center.
**Implementation**: Uses cv2.getRotationMatrix2D + cv2.warpAffine
**Parameters**:
- **angle**: Degrees (0-360)
- **center**: Rotation center (default: image center)
**Returns**: Rotated image (same dimensions)
---
### `calculate_rotation_from_points(src_points: np.ndarray, dst_points: np.ndarray) -> float`
**Description**: Calculates rotation angle from point correspondences.
**Input**: (N, 2) arrays of matching points
**Algorithm**:
1. Compute centroids
2. Calculate angle from centroid shifts
3. Return angle in degrees
**Use Case**: Extract precise angle from LiteSAM homography
---
### `normalize_angle(angle: float) -> float`
**Description**: Normalizes angle to 0-360 range.
**Formula**:
```
angle = angle % 360
if angle < 0:
angle += 360
```
---
### `compute_rotation_matrix(angle: float, center: Tuple[int, int]) -> np.ndarray`
**Description**: Computes 2D rotation matrix.
**Returns**: 2×3 affine transformation matrix
## Dependencies
**External**: opencv-python, numpy
## Test Cases
1. Rotate 90° → image rotated correctly
2. Calculate angle from points → accurate angle
3. Normalize 370° → 10°
4. Rotation matrix → correct transformation
@@ -0,0 +1,329 @@
# Batch Validator Helper
## Interface Definition
**Interface Name**: `IBatchValidator`
### Interface Methods
```python
class IBatchValidator(ABC):
@abstractmethod
def validate_batch_size(self, batch: ImageBatch) -> ValidationResult:
pass
@abstractmethod
def check_sequence_continuity(self, batch: ImageBatch, expected_start: int) -> ValidationResult:
pass
@abstractmethod
def validate_naming_convention(self, filenames: List[str]) -> ValidationResult:
pass
@abstractmethod
def validate_format(self, image_data: bytes) -> ValidationResult:
pass
```
## Component Description
### Responsibilities
- Validate image batch integrity
- Check sequence continuity and naming conventions
- Validate image format and dimensions
- Ensure batch size constraints (10-50 images)
- Support strict sequential ordering (ADxxxxxx.jpg)
### Scope
- Batch validation for G05 Image Input Pipeline
- Image format validation
- Filename pattern matching
- Sequence gap detection
## API Methods
### `validate_batch_size(batch: ImageBatch) -> ValidationResult`
**Description**: Validates batch contains 10-50 images.
**Called By**:
- G05 Image Input Pipeline (before queuing)
**Input**:
```python
batch: ImageBatch:
images: List[bytes]
filenames: List[str]
start_sequence: int
end_sequence: int
```
**Output**:
```python
ValidationResult:
valid: bool
errors: List[str]
```
**Validation Rules**:
- **Minimum batch size**: 10 images
- **Maximum batch size**: 50 images
- **Reason**: Balance between upload overhead and processing granularity
**Error Conditions**:
- Returns `valid=False` with error message (not an exception)
**Test Cases**:
1. **Valid batch (20 images)**: Returns `valid=True`
2. **Too few images (5)**: Returns `valid=False`, error="Batch size 5 below minimum 10"
3. **Too many images (60)**: Returns `valid=False`, error="Batch size 60 exceeds maximum 50"
4. **Empty batch**: Returns `valid=False`
---
### `check_sequence_continuity(batch: ImageBatch, expected_start: int) -> ValidationResult`
**Description**: Validates images form consecutive sequence with no gaps.
**Called By**:
- G05 Image Input Pipeline (before queuing)
**Input**:
```python
batch: ImageBatch
expected_start: int # Expected starting sequence number
```
**Output**:
```python
ValidationResult:
valid: bool
errors: List[str]
```
**Validation Rules**:
1. **Sequence starts at expected_start**: First image sequence == expected_start
2. **Consecutive numbers**: No gaps in sequence (AD000101, AD000102, AD000103, ...)
3. **Filename extraction**: Parse sequence from ADxxxxxx.jpg pattern
4. **Strict ordering**: Images must be in sequential order
**Algorithm**:
```python
sequences = [extract_sequence(filename) for filename in batch.filenames]
if sequences[0] != expected_start:
return invalid("Expected start {expected_start}, got {sequences[0]}")
for i in range(len(sequences) - 1):
if sequences[i+1] != sequences[i] + 1:
return invalid(f"Gap detected: {sequences[i]} -> {sequences[i+1]}")
return valid()
```
**Error Conditions**:
- Returns `valid=False` with specific gap information
**Test Cases**:
1. **Valid sequence (101-150)**: expected_start=101 → valid=True
2. **Wrong start**: expected_start=101, got 102 → valid=False
3. **Gap in sequence**: AD000101, AD000103 (missing 102) → valid=False
4. **Out of order**: AD000102, AD000101 → valid=False
---
### `validate_naming_convention(filenames: List[str]) -> ValidationResult`
**Description**: Validates filenames match ADxxxxxx.jpg pattern.
**Called By**:
- Internal (during check_sequence_continuity)
- G05 Image Input Pipeline
**Input**:
```python
filenames: List[str]
```
**Output**:
```python
ValidationResult:
valid: bool
errors: List[str]
```
**Validation Rules**:
1. **Pattern**: `AD\d{6}\.(jpg|JPG|png|PNG)`
2. **Examples**: AD000001.jpg, AD000237.JPG, AD002000.png
3. **Case insensitive**: Accepts .jpg, .JPG, .Jpg
4. **6 digits required**: Zero-padded to 6 digits
**Regex Pattern**: `^AD\d{6}\.(jpg|JPG|png|PNG)$`
**Error Conditions**:
- Returns `valid=False` listing invalid filenames
**Test Cases**:
1. **Valid names**: ["AD000001.jpg", "AD000002.jpg"] → valid=True
2. **Invalid prefix**: "IMG_0001.jpg" → valid=False
3. **Wrong digit count**: "AD001.jpg" (3 digits) → valid=False
4. **Missing extension**: "AD000001" → valid=False
5. **Invalid extension**: "AD000001.bmp" → valid=False
---
### `validate_format(image_data: bytes) -> ValidationResult`
**Description**: Validates image file format and properties.
**Called By**:
- G05 Image Input Pipeline (per-image validation)
**Input**:
```python
image_data: bytes # Raw image file bytes
```
**Output**:
```python
ValidationResult:
valid: bool
errors: List[str]
```
**Validation Rules**:
1. **Format**: Valid JPEG or PNG
2. **Dimensions**: 640×480 to 6252×4168 pixels
3. **File size**: < 10MB per image
4. **Image readable**: Not corrupted
5. **Color channels**: RGB (3 channels)
**Algorithm**:
```python
try:
image = PIL.Image.open(BytesIO(image_data))
width, height = image.size
if image.format not in ['JPEG', 'PNG']:
return invalid("Format must be JPEG or PNG")
if width < 640 or height < 480:
return invalid("Dimensions too small")
if width > 6252 or height > 4168:
return invalid("Dimensions too large")
if len(image_data) > 10 * 1024 * 1024:
return invalid("File size exceeds 10MB")
return valid()
except Exception as e:
return invalid(f"Corrupted image: {e}")
```
**Error Conditions**:
- Returns `valid=False` with specific error
**Test Cases**:
1. **Valid JPEG (2048×1536)**: valid=True
2. **Valid PNG (6252×4168)**: valid=True
3. **Too small (320×240)**: valid=False
4. **Too large (8000×6000)**: valid=False
5. **File too big (15MB)**: valid=False
6. **Corrupted file**: valid=False
7. **BMP format**: valid=False
## Integration Tests
### Test 1: Complete Batch Validation
1. Create batch with 20 images, AD000101.jpg - AD000120.jpg
2. validate_batch_size() → valid
3. validate_naming_convention() → valid
4. check_sequence_continuity(expected_start=101) → valid
5. validate_format() for each image → all valid
### Test 2: Invalid Batch Detection
1. Create batch with 60 images → validate_batch_size() fails
2. Create batch with gap (AD000101, AD000103) → check_sequence_continuity() fails
3. Create batch with IMG_0001.jpg → validate_naming_convention() fails
4. Create batch with corrupted image → validate_format() fails
### Test 3: Edge Cases
1. Batch with exactly 10 images → valid
2. Batch with exactly 50 images → valid
3. Batch with 51 images → invalid
4. Batch starting at AD999995.jpg (near max) → valid
## Non-Functional Requirements
### Performance
- **validate_batch_size**: < 1ms
- **check_sequence_continuity**: < 10ms for 50 images
- **validate_naming_convention**: < 5ms for 50 filenames
- **validate_format**: < 20ms per image (with PIL)
- **Total batch validation**: < 100ms for 50 images
### Reliability
- Never raises exceptions (returns ValidationResult with errors)
- Handles edge cases gracefully
- Clear, actionable error messages
### Maintainability
- Configurable validation rules (min/max batch size, dimensions)
- Easy to add new validation rules
- Comprehensive error reporting
## Dependencies
### Internal Components
- None (pure utility, no internal dependencies)
### External Dependencies
- **Pillow (PIL)**: Image format validation and dimension checking
- **re** (regex): Filename pattern matching
## Data Models
### ImageBatch
```python
class ImageBatch(BaseModel):
images: List[bytes] # Raw image data
filenames: List[str] # e.g., ["AD000101.jpg", ...]
start_sequence: int # 101
end_sequence: int # 150
batch_number: int # Sequential batch number
```
### ValidationResult
```python
class ValidationResult(BaseModel):
valid: bool
errors: List[str] = [] # Empty if valid
warnings: List[str] = [] # Optional warnings
```
### ValidationRules (Configuration)
```python
class ValidationRules(BaseModel):
min_batch_size: int = 10
max_batch_size: int = 50
min_width: int = 640
min_height: int = 480
max_width: int = 6252
max_height: int = 4168
max_file_size_mb: int = 10
allowed_formats: List[str] = ["JPEG", "PNG"]
filename_pattern: str = r"^AD\d{6}\.(jpg|JPG|png|PNG)$"
```
### Sequence Extraction
```python
def extract_sequence(filename: str) -> int:
"""
Extracts sequence number from filename.
Example: "AD000237.jpg" -> 237
"""
match = re.match(r"AD(\d{6})\.", filename)
if match:
return int(match.group(1))
raise ValueError(f"Invalid filename format: {filename}")
```