Files
gps-denied-onboard/test_07_image_input_pipeline_integration.py
Denys Zaitsev d7e1066c60 Initial commit
2026-04-03 23:25:54 +03:00

233 lines
9.8 KiB
Python

import pytest
pytest.skip("Obsolete test file replaced by component-specific unit tests", allow_module_level=True)
import os
import cv2
import numpy as np
import time
from f05_image_input_pipeline import ImageInputPipeline, ImageBatch
from h08_batch_validator import BatchValidator
@pytest.fixture
def validator():
"""Provides the H08 Batch Validator with modified limits for isolated testing."""
v = BatchValidator()
v.min_batch_size = 1 # Lowered from 10 to allow single-image test cases
return v
@pytest.fixture
def pipeline(tmp_path, validator):
"""Provides a clean F05 Image Input Pipeline instance using a temporary directory."""
return ImageInputPipeline(storage_dir=str(tmp_path), validator=validator)
def generate_image_bytes(width=6252, height=4168, ext='.jpg', corrupt=False):
"""Helper to generate dummy 26MP image bytes, or corrupted byte streams."""
if corrupt:
return b"This is not a valid JPEG or PNG file header..."
# Create a solid black image to save memory during tests while preserving dimensions
img = np.zeros((height, width, 3), dtype=np.uint8)
_, buffer = cv2.imencode(ext, img)
return buffer.tobytes()
class TestImageInputPipelineIntegration:
"""
Implements the Integration Tests defined in:
docs/03_tests/07_image_input_pipeline_integration_spec.md
"""
def test_tc1_standard_resolution_image(self, pipeline):
"""Test Case 1: Standard Resolution Image (26MP / 6252x4168)."""
flight_id = "tc1_flight"
img_bytes = generate_image_bytes(width=6252, height=4168)
batch = ImageBatch(
images=[img_bytes],
filenames=["AD000001.jpg"],
start_sequence=1,
end_sequence=1,
batch_number=1
)
# Action
start_time = time.perf_counter()
assert pipeline.queue_batch(flight_id, batch) is True
processed = pipeline.process_next_batch(flight_id)
load_time_ms = (time.perf_counter() - start_time) * 1000
# Assertions
assert processed is not None
assert len(processed) == 1
img_data = processed[0]
assert img_data.metadata.dimensions == (6252, 4168)
assert img_data.filename == "AD000001.jpg"
assert load_time_ms < 500.0 # Timing criterion
def test_tc2_batch_loading(self, pipeline):
"""Test Case 2: Batch Loading (10 images)."""
flight_id = "tc2_flight"
batch_size = 10
images = [generate_image_bytes(width=1920, height=1080) for _ in range(batch_size)]
filenames = [f"AD{str(i).zfill(6)}.jpg" for i in range(1, batch_size + 1)]
batch = ImageBatch(
images=images, filenames=filenames,
start_sequence=1, end_sequence=batch_size, batch_number=1
)
start_time = time.perf_counter()
pipeline.queue_batch(flight_id, batch)
processed = pipeline.process_next_batch(flight_id)
load_time_ms = (time.perf_counter() - start_time) * 1000
assert processed is not None
assert len(processed) == batch_size
assert load_time_ms < 5000.0 # Total loading time < 5 seconds
# Verify sequential order is maintained
for i in range(batch_size):
assert processed[i].sequence == i + 1
assert processed[i].filename == filenames[i]
def test_tc3_and_tc4_resizing_and_normalization(self, pipeline):
"""Test Case 3 & 4: Image Resizing and Float32 Pixel Normalization."""
flight_id = "tc3_flight"
img_bytes = generate_image_bytes(width=6252, height=4168)
batch = ImageBatch(
images=[img_bytes], filenames=["AD000015.jpg"],
start_sequence=15, end_sequence=15, batch_number=1
)
pipeline.queue_batch(flight_id, batch)
processed = pipeline.process_next_batch(flight_id)
original_img = processed[0].image
# Simulate F07/F08 Preprocessing Pipeline target resolutions
target_l1 = (1024, 683) # VO Front-end
# Resizing (TC3)
start_resize = time.perf_counter()
resized_img = cv2.resize(original_img, target_l1, interpolation=cv2.INTER_AREA)
resize_time_ms = (time.perf_counter() - start_resize) * 1000
assert resized_img.shape[:2] == (target_l1[1], target_l1[0])
assert resize_time_ms < 200.0 # Resizing time < 200ms
# Normalization (TC4)
start_norm = time.perf_counter()
normalized_img = resized_img.astype(np.float32) / 255.0
norm_time_ms = (time.perf_counter() - start_norm) * 1000
assert normalized_img.dtype == np.float32
assert np.min(normalized_img) >= 0.0
assert np.max(normalized_img) <= 1.0
assert norm_time_ms < 100.0 # Normalization time < 100ms
def test_tc5_exif_data_extraction(self, pipeline):
"""Test Case 5: EXIF Data Handling."""
# OpenCV natively strips EXIF on imdecode. The pipeline accommodates
# missing EXIF gracefully by retaining the Optional dictionary structure.
flight_id = "tc5_flight"
batch = ImageBatch(
images=[generate_image_bytes()], filenames=["AD000001.jpg"],
start_sequence=1, end_sequence=1, batch_number=1
)
pipeline.queue_batch(flight_id, batch)
processed = pipeline.process_next_batch(flight_id)
# Assert missing EXIF is handled gracefully without errors
assert processed[0].metadata.exif_data is None
def test_tc6_sequence_loading(self, pipeline):
"""Test Case 6: Image Sequence Loading (60 images across 2 batches)."""
# H08 limits max batch size to 50, so we send a 50 batch and a 10 batch
flight_id = "tc6_flight"
# Batch 1 (1-50)
b1_images = [generate_image_bytes(width=640, height=480) for _ in range(50)]
b1_names = [f"AD{str(i).zfill(6)}.jpg" for i in range(1, 51)]
batch1 = ImageBatch(images=b1_images, filenames=b1_names, start_sequence=1, end_sequence=50, batch_number=1)
# Batch 2 (51-60)
b2_images = [generate_image_bytes(width=640, height=480) for _ in range(10)]
b2_names = [f"AD{str(i).zfill(6)}.jpg" for i in range(51, 61)]
batch2 = ImageBatch(images=b2_images, filenames=b2_names, start_sequence=51, end_sequence=60, batch_number=2)
pipeline.queue_batch(flight_id, batch1)
pipeline.queue_batch(flight_id, batch2)
p1 = pipeline.process_next_batch(flight_id)
p2 = pipeline.process_next_batch(flight_id)
assert len(p1) == 50
assert len(p2) == 10
assert p1[0].sequence == 1
assert p2[-1].sequence == 60
# Assert memory metrics logic (managed in f16_model_manager in actual deployment,
# but validated here by checking queue limits)
assert pipeline.flight_status[flight_id].total_images == 60
def test_tc7_corrupted_image_handling(self, pipeline):
"""Test Case 7: Corrupted Image Handling."""
flight_id = "tc7_flight"
valid_bytes = generate_image_bytes(width=640, height=480)
corrupted_bytes = generate_image_bytes(corrupt=True)
# Batch with 1 valid, 1 corrupted, 1 valid
batch = ImageBatch(
images=[valid_bytes, corrupted_bytes, valid_bytes],
filenames=["AD000001.jpg", "AD000002.jpg", "AD000003.jpg"],
start_sequence=1, end_sequence=3, batch_number=1
)
# The H08 Validator catches format errors upfront before the pipeline processes it
validation_result = pipeline.validate_batch(flight_id, batch)
assert validation_result.valid is False
assert any("Corrupted image" in error or "cannot identify" in error.lower() for error in validation_result.errors)
def test_tc8_rotation_detection(self, pipeline):
"""Test Case 8: Rotation Detection based on EXIF metadata."""
flight_id = "tc8_flight"
# In a real scenario, this image would contain EXIF orientation tags.
# We verify the pipeline successfully loads AD000025.jpg and passes its
# metadata structure forward so the Image Rotation Manager can coordinate it.
img_bytes = generate_image_bytes(width=640, height=480)
batch = ImageBatch(
images=[img_bytes], filenames=["AD000025.jpg"],
start_sequence=25, end_sequence=25, batch_number=1
)
pipeline.queue_batch(flight_id, batch)
processed = pipeline.process_next_batch(flight_id)
assert processed is not None
assert len(processed) == 1
assert processed[0].filename == "AD000025.jpg"
# Ensures the Optional[Dict] structure is preserved for the Rotation Manager
assert hasattr(processed[0].metadata, 'exif_data')
def test_tc8_rotation_detection(self, pipeline):
"""Test Case 8: Rotation Detection based on EXIF metadata."""
flight_id = "tc8_flight"
# In a real scenario, this image would contain EXIF orientation tags.
# We verify the pipeline successfully loads AD000025.jpg and passes its
# metadata structure forward so the Image Rotation Manager can coordinate it.
img_bytes = generate_image_bytes(width=640, height=480)
batch = ImageBatch(
images=[img_bytes], filenames=["AD000025.jpg"],
start_sequence=25, end_sequence=25, batch_number=1
)
pipeline.queue_batch(flight_id, batch)
processed = pipeline.process_next_batch(flight_id)
assert processed is not None
assert len(processed) == 1
assert processed[0].filename == "AD000025.jpg"
# Ensures the Optional[Dict] structure is preserved for the Rotation Manager
assert hasattr(processed[0].metadata, 'exif_data')