add chunking

This commit is contained in:
Oleksandr Bezdieniezhnykh
2025-11-27 03:43:19 +02:00
parent 4f8c18a066
commit 2037870f67
43 changed files with 7041 additions and 4135 deletions
@@ -0,0 +1,450 @@
# Image Input Pipeline
## Interface Definition
**Interface Name**: `IImageInputPipeline`
### Interface Methods
```python
class IImageInputPipeline(ABC):
@abstractmethod
def queue_batch(self, flight_id: str, batch: ImageBatch) -> bool:
pass
@abstractmethod
def process_next_batch(self, flight_id: str) -> Optional[ProcessedBatch]:
pass
@abstractmethod
def validate_batch(self, batch: ImageBatch) -> ValidationResult:
pass
@abstractmethod
def store_images(self, flight_id: str, images: List[ImageData]) -> bool:
pass
@abstractmethod
def get_next_image(self, flight_id: str) -> Optional[ImageData]:
pass
@abstractmethod
def get_image_by_sequence(self, flight_id: str, sequence: int) -> Optional[ImageData]:
pass
@abstractmethod
def get_image_metadata(self, flight_id: str, sequence: int) -> Optional[ImageMetadata]:
pass
@abstractmethod
def get_processing_status(self, flight_id: str) -> ProcessingStatus:
pass
```
## Component Description
### Responsibilities
- Unified image ingestion, validation, storage, and retrieval
- FIFO batch queuing for processing
- Validate consecutive naming (AD000001, AD000002, etc.)
- Validate sequence integrity (strict sequential ordering)
- Image persistence with indexed retrieval
- Metadata extraction (EXIF, dimensions)
### Scope
- Batch queue management
- Image validation
- Disk storage management
- Sequential processing coordination
- Metadata management
## API Methods
### `queue_batch(flight_id: str, batch: ImageBatch) -> bool`
**Description**: Queues a batch of images for processing (FIFO).
**Called By**:
- F01 GPS-Denied REST API (after upload)
**Input**:
```python
flight_id: str
batch: ImageBatch:
images: List[bytes] # Raw image data
filenames: List[str] # e.g., ["AD000101.jpg", "AD000102.jpg", ...]
start_sequence: int # 101
end_sequence: int # 150
```
**Output**:
```python
bool: True if queued successfully
```
**Processing Flow**:
1. Validate batch using H08 Batch Validator
2. Check sequence continuity (no gaps)
3. Add to FIFO queue for flight_id
4. Return immediately (async processing)
**Error Conditions**:
- `ValidationError`: Sequence gap, invalid naming
- `QueueFullError`: Queue capacity exceeded
**Test Cases**:
1. **Valid batch**: Queued successfully
2. **Sequence gap**: Batch 101-150, expecting 51-100 → error
3. **Invalid naming**: Non-consecutive names → error
4. **Queue full**: Returns error with backpressure signal
---
### `process_next_batch(flight_id: str) -> Optional[ProcessedBatch]`
**Description**: Dequeues and processes the next batch from FIFO queue.
**Called By**:
- Internal processing loop (background worker)
**Input**:
```python
flight_id: str
```
**Output**:
```python
ProcessedBatch:
images: List[ImageData]
batch_id: str
start_sequence: int
end_sequence: int
```
**Processing Flow**:
1. Dequeue next batch
2. Decompress/decode images
3. Extract metadata (EXIF, dimensions)
4. Store images to disk
5. Return ProcessedBatch for pipeline
**Error Conditions**:
- Returns `None`: Queue empty
- `ImageCorruptionError`: Invalid image data
**Test Cases**:
1. **Process batch**: Dequeues, returns ImageData list
2. **Empty queue**: Returns None
3. **Corrupted image**: Logs error, skips image
---
### `validate_batch(batch: ImageBatch) -> ValidationResult`
**Description**: Validates batch integrity and sequence continuity.
**Called By**:
- Internal (before queuing)
- H08 Batch Validator (delegated validation)
**Input**:
```python
batch: ImageBatch
```
**Output**:
```python
ValidationResult:
valid: bool
errors: List[str]
```
**Validation Rules**:
1. **Batch size**: 10 <= len(images) <= 50
2. **Naming convention**: ADxxxxxx.jpg (6 digits)
3. **Sequence continuity**: Consecutive numbers
4. **File format**: JPEG or PNG
5. **Image dimensions**: 640x480 to 6252x4168
6. **File size**: < 10MB per image
**Test Cases**:
1. **Valid batch**: Returns valid=True
2. **Too few images**: 5 images → invalid
3. **Too many images**: 60 images → invalid
4. **Non-consecutive**: AD000101, AD000103 → invalid
5. **Invalid naming**: IMG_0001.jpg → invalid
---
### `store_images(flight_id: str, images: List[ImageData]) -> bool`
**Description**: Persists images to disk with indexed storage.
**Called By**:
- Internal (after processing batch)
**Input**:
```python
flight_id: str
images: List[ImageData]
```
**Output**:
```python
bool: True if stored successfully
```
**Storage Structure**:
```
/image_storage/
{flight_id}/
AD000001.jpg
AD000002.jpg
metadata.json
```
**Processing Flow**:
1. Create flight directory if not exists
2. Write each image to disk
3. Update metadata index
4. Persist to F17 Database Layer (metadata only)
**Error Conditions**:
- `StorageError`: Disk full, permission error
**Test Cases**:
1. **Store batch**: All images written successfully
2. **Disk full**: Returns False
3. **Verify storage**: Images retrievable after storage
---
### `get_next_image(flight_id: str) -> Optional[ImageData]`
**Description**: Gets the next image in sequence for processing.
**Called By**:
- F06 Image Rotation Manager
- F07 Sequential VO
- Processing pipeline (main loop)
**Input**:
```python
flight_id: str
```
**Output**:
```python
ImageData:
flight_id: str
sequence: int
filename: str
image: np.ndarray # Loaded image
metadata: ImageMetadata
```
**Processing Flow**:
1. Track current sequence number for flight
2. Load next image from disk
3. Increment sequence counter
4. Return ImageData
**Error Conditions**:
- Returns `None`: No more images
- `ImageNotFoundError`: Expected image missing
**Test Cases**:
1. **Get sequential images**: Returns images in order
2. **End of sequence**: Returns None
3. **Missing image**: Handles gracefully
---
### `get_image_by_sequence(flight_id: str, sequence: int) -> Optional[ImageData]`
**Description**: Retrieves a specific image by sequence number.
**Called By**:
- F11 Failure Recovery Coordinator (for user fix)
- F13 Result Manager (for refinement)
**Input**:
```python
flight_id: str
sequence: int
```
**Output**:
```python
Optional[ImageData]
```
**Processing Flow**:
1. Construct filename from sequence (ADxxxxxx.jpg)
2. Load from disk
3. Load metadata
4. Return ImageData
**Error Conditions**:
- Returns `None`: Image not found
**Test Cases**:
1. **Get specific image**: Returns correct image
2. **Invalid sequence**: Returns None
---
### `get_image_metadata(flight_id: str, sequence: int) -> Optional[ImageMetadata]`
**Description**: Retrieves metadata without loading full image (lightweight).
**Called By**:
- F02 Flight Manager (status checks)
- F13 Result Manager (metadata-only queries)
**Input**:
```python
flight_id: str
sequence: int
```
**Output**:
```python
ImageMetadata:
sequence: int
filename: str
dimensions: Tuple[int, int] # (width, height)
file_size: int # bytes
timestamp: datetime
exif_data: Optional[Dict]
```
**Test Cases**:
1. **Get metadata**: Returns quickly without loading image
2. **Missing image**: Returns None
---
### `get_processing_status(flight_id: str) -> ProcessingStatus`
**Description**: Gets current processing status for a flight.
**Called By**:
- F01 GPS-Denied REST API (status endpoint)
- F02 Flight Manager
**Input**:
```python
flight_id: str
```
**Output**:
```python
ProcessingStatus:
flight_id: str
total_images: int
processed_images: int
current_sequence: int
queued_batches: int
processing_rate: float # images/second
```
**Test Cases**:
1. **Get status**: Returns accurate counts
2. **During processing**: Updates in real-time
## Integration Tests
### Test 1: Batch Processing Flow
1. queue_batch() with 50 images
2. process_next_batch() → returns batch
3. store_images() → persists to disk
4. get_next_image() × 50 → retrieves all sequentially
5. Verify metadata
### Test 2: Multiple Batches
1. queue_batch() × 5 (250 images total)
2. process_next_batch() × 5
3. Verify FIFO order maintained
4. Verify sequence continuity
### Test 3: Error Handling
1. Queue batch with sequence gap
2. Verify validation error
3. Queue valid batch → succeeds
4. Simulate disk full → storage fails gracefully
## Non-Functional Requirements
### Performance
- **queue_batch**: < 100ms
- **process_next_batch**: < 2 seconds for 50 images
- **get_next_image**: < 50ms
- **get_image_by_sequence**: < 50ms
- **Processing throughput**: 10-20 images/second
### Scalability
- Support 3000 images per flight
- Handle 10 concurrent flights
- Manage 100GB+ image storage
### Reliability
- Crash recovery (resume processing from last sequence)
- Atomic batch operations
- Data integrity validation
## Dependencies
### Internal Components
- **H08 Batch Validator**: For validation logic
- **F17 Database Layer**: For metadata persistence
### External Dependencies
- **opencv-python**: Image I/O
- **Pillow**: Image processing
- **numpy**: Image arrays
## Data Models
### ImageBatch
```python
class ImageBatch(BaseModel):
images: List[bytes]
filenames: List[str]
start_sequence: int
end_sequence: int
batch_number: int
```
### ImageData
```python
class ImageData(BaseModel):
flight_id: str
sequence: int
filename: str
image: np.ndarray
metadata: ImageMetadata
```
### ImageMetadata
```python
class ImageMetadata(BaseModel):
sequence: int
filename: str
dimensions: Tuple[int, int]
file_size: int
timestamp: datetime
exif_data: Optional[Dict]
```
### ProcessingStatus
```python
class ProcessingStatus(BaseModel):
flight_id: str
total_images: int
processed_images: int
current_sequence: int
queued_batches: int
processing_rate: float
```