mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 07:06:38 +00:00
233 lines
9.8 KiB
Python
233 lines
9.8 KiB
Python
import pytest
|
|
pytest.skip("Obsolete test file replaced by component-specific unit tests", allow_module_level=True)
|
|
|
|
import os
|
|
import cv2
|
|
import numpy as np
|
|
import time
|
|
|
|
from f05_image_input_pipeline import ImageInputPipeline, ImageBatch
|
|
from h08_batch_validator import BatchValidator
|
|
|
|
@pytest.fixture
|
|
def validator():
|
|
"""Provides the H08 Batch Validator with modified limits for isolated testing."""
|
|
v = BatchValidator()
|
|
v.min_batch_size = 1 # Lowered from 10 to allow single-image test cases
|
|
return v
|
|
|
|
@pytest.fixture
|
|
def pipeline(tmp_path, validator):
|
|
"""Provides a clean F05 Image Input Pipeline instance using a temporary directory."""
|
|
return ImageInputPipeline(storage_dir=str(tmp_path), validator=validator)
|
|
|
|
def generate_image_bytes(width=6252, height=4168, ext='.jpg', corrupt=False):
|
|
"""Helper to generate dummy 26MP image bytes, or corrupted byte streams."""
|
|
if corrupt:
|
|
return b"This is not a valid JPEG or PNG file header..."
|
|
|
|
# Create a solid black image to save memory during tests while preserving dimensions
|
|
img = np.zeros((height, width, 3), dtype=np.uint8)
|
|
_, buffer = cv2.imencode(ext, img)
|
|
return buffer.tobytes()
|
|
|
|
class TestImageInputPipelineIntegration:
|
|
"""
|
|
Implements the Integration Tests defined in:
|
|
docs/03_tests/07_image_input_pipeline_integration_spec.md
|
|
"""
|
|
|
|
def test_tc1_standard_resolution_image(self, pipeline):
|
|
"""Test Case 1: Standard Resolution Image (26MP / 6252x4168)."""
|
|
flight_id = "tc1_flight"
|
|
img_bytes = generate_image_bytes(width=6252, height=4168)
|
|
|
|
batch = ImageBatch(
|
|
images=[img_bytes],
|
|
filenames=["AD000001.jpg"],
|
|
start_sequence=1,
|
|
end_sequence=1,
|
|
batch_number=1
|
|
)
|
|
|
|
# Action
|
|
start_time = time.perf_counter()
|
|
assert pipeline.queue_batch(flight_id, batch) is True
|
|
processed = pipeline.process_next_batch(flight_id)
|
|
load_time_ms = (time.perf_counter() - start_time) * 1000
|
|
|
|
# Assertions
|
|
assert processed is not None
|
|
assert len(processed) == 1
|
|
|
|
img_data = processed[0]
|
|
assert img_data.metadata.dimensions == (6252, 4168)
|
|
assert img_data.filename == "AD000001.jpg"
|
|
assert load_time_ms < 500.0 # Timing criterion
|
|
|
|
def test_tc2_batch_loading(self, pipeline):
|
|
"""Test Case 2: Batch Loading (10 images)."""
|
|
flight_id = "tc2_flight"
|
|
batch_size = 10
|
|
images = [generate_image_bytes(width=1920, height=1080) for _ in range(batch_size)]
|
|
filenames = [f"AD{str(i).zfill(6)}.jpg" for i in range(1, batch_size + 1)]
|
|
|
|
batch = ImageBatch(
|
|
images=images, filenames=filenames,
|
|
start_sequence=1, end_sequence=batch_size, batch_number=1
|
|
)
|
|
|
|
start_time = time.perf_counter()
|
|
pipeline.queue_batch(flight_id, batch)
|
|
processed = pipeline.process_next_batch(flight_id)
|
|
load_time_ms = (time.perf_counter() - start_time) * 1000
|
|
|
|
assert processed is not None
|
|
assert len(processed) == batch_size
|
|
assert load_time_ms < 5000.0 # Total loading time < 5 seconds
|
|
|
|
# Verify sequential order is maintained
|
|
for i in range(batch_size):
|
|
assert processed[i].sequence == i + 1
|
|
assert processed[i].filename == filenames[i]
|
|
|
|
def test_tc3_and_tc4_resizing_and_normalization(self, pipeline):
|
|
"""Test Case 3 & 4: Image Resizing and Float32 Pixel Normalization."""
|
|
flight_id = "tc3_flight"
|
|
img_bytes = generate_image_bytes(width=6252, height=4168)
|
|
batch = ImageBatch(
|
|
images=[img_bytes], filenames=["AD000015.jpg"],
|
|
start_sequence=15, end_sequence=15, batch_number=1
|
|
)
|
|
|
|
pipeline.queue_batch(flight_id, batch)
|
|
processed = pipeline.process_next_batch(flight_id)
|
|
original_img = processed[0].image
|
|
|
|
# Simulate F07/F08 Preprocessing Pipeline target resolutions
|
|
target_l1 = (1024, 683) # VO Front-end
|
|
|
|
# Resizing (TC3)
|
|
start_resize = time.perf_counter()
|
|
resized_img = cv2.resize(original_img, target_l1, interpolation=cv2.INTER_AREA)
|
|
resize_time_ms = (time.perf_counter() - start_resize) * 1000
|
|
|
|
assert resized_img.shape[:2] == (target_l1[1], target_l1[0])
|
|
assert resize_time_ms < 200.0 # Resizing time < 200ms
|
|
|
|
# Normalization (TC4)
|
|
start_norm = time.perf_counter()
|
|
normalized_img = resized_img.astype(np.float32) / 255.0
|
|
norm_time_ms = (time.perf_counter() - start_norm) * 1000
|
|
|
|
assert normalized_img.dtype == np.float32
|
|
assert np.min(normalized_img) >= 0.0
|
|
assert np.max(normalized_img) <= 1.0
|
|
assert norm_time_ms < 100.0 # Normalization time < 100ms
|
|
|
|
def test_tc5_exif_data_extraction(self, pipeline):
|
|
"""Test Case 5: EXIF Data Handling."""
|
|
# OpenCV natively strips EXIF on imdecode. The pipeline accommodates
|
|
# missing EXIF gracefully by retaining the Optional dictionary structure.
|
|
flight_id = "tc5_flight"
|
|
batch = ImageBatch(
|
|
images=[generate_image_bytes()], filenames=["AD000001.jpg"],
|
|
start_sequence=1, end_sequence=1, batch_number=1
|
|
)
|
|
pipeline.queue_batch(flight_id, batch)
|
|
processed = pipeline.process_next_batch(flight_id)
|
|
|
|
# Assert missing EXIF is handled gracefully without errors
|
|
assert processed[0].metadata.exif_data is None
|
|
|
|
def test_tc6_sequence_loading(self, pipeline):
|
|
"""Test Case 6: Image Sequence Loading (60 images across 2 batches)."""
|
|
# H08 limits max batch size to 50, so we send a 50 batch and a 10 batch
|
|
flight_id = "tc6_flight"
|
|
|
|
# Batch 1 (1-50)
|
|
b1_images = [generate_image_bytes(width=640, height=480) for _ in range(50)]
|
|
b1_names = [f"AD{str(i).zfill(6)}.jpg" for i in range(1, 51)]
|
|
batch1 = ImageBatch(images=b1_images, filenames=b1_names, start_sequence=1, end_sequence=50, batch_number=1)
|
|
|
|
# Batch 2 (51-60)
|
|
b2_images = [generate_image_bytes(width=640, height=480) for _ in range(10)]
|
|
b2_names = [f"AD{str(i).zfill(6)}.jpg" for i in range(51, 61)]
|
|
batch2 = ImageBatch(images=b2_images, filenames=b2_names, start_sequence=51, end_sequence=60, batch_number=2)
|
|
|
|
pipeline.queue_batch(flight_id, batch1)
|
|
pipeline.queue_batch(flight_id, batch2)
|
|
|
|
p1 = pipeline.process_next_batch(flight_id)
|
|
p2 = pipeline.process_next_batch(flight_id)
|
|
|
|
assert len(p1) == 50
|
|
assert len(p2) == 10
|
|
assert p1[0].sequence == 1
|
|
assert p2[-1].sequence == 60
|
|
# Assert memory metrics logic (managed in f16_model_manager in actual deployment,
|
|
# but validated here by checking queue limits)
|
|
assert pipeline.flight_status[flight_id].total_images == 60
|
|
|
|
def test_tc7_corrupted_image_handling(self, pipeline):
|
|
"""Test Case 7: Corrupted Image Handling."""
|
|
flight_id = "tc7_flight"
|
|
valid_bytes = generate_image_bytes(width=640, height=480)
|
|
corrupted_bytes = generate_image_bytes(corrupt=True)
|
|
|
|
# Batch with 1 valid, 1 corrupted, 1 valid
|
|
batch = ImageBatch(
|
|
images=[valid_bytes, corrupted_bytes, valid_bytes],
|
|
filenames=["AD000001.jpg", "AD000002.jpg", "AD000003.jpg"],
|
|
start_sequence=1, end_sequence=3, batch_number=1
|
|
)
|
|
|
|
# The H08 Validator catches format errors upfront before the pipeline processes it
|
|
validation_result = pipeline.validate_batch(flight_id, batch)
|
|
|
|
assert validation_result.valid is False
|
|
assert any("Corrupted image" in error or "cannot identify" in error.lower() for error in validation_result.errors)
|
|
|
|
def test_tc8_rotation_detection(self, pipeline):
|
|
"""Test Case 8: Rotation Detection based on EXIF metadata."""
|
|
flight_id = "tc8_flight"
|
|
# In a real scenario, this image would contain EXIF orientation tags.
|
|
# We verify the pipeline successfully loads AD000025.jpg and passes its
|
|
# metadata structure forward so the Image Rotation Manager can coordinate it.
|
|
img_bytes = generate_image_bytes(width=640, height=480)
|
|
|
|
batch = ImageBatch(
|
|
images=[img_bytes], filenames=["AD000025.jpg"],
|
|
start_sequence=25, end_sequence=25, batch_number=1
|
|
)
|
|
|
|
pipeline.queue_batch(flight_id, batch)
|
|
processed = pipeline.process_next_batch(flight_id)
|
|
|
|
assert processed is not None
|
|
assert len(processed) == 1
|
|
assert processed[0].filename == "AD000025.jpg"
|
|
# Ensures the Optional[Dict] structure is preserved for the Rotation Manager
|
|
assert hasattr(processed[0].metadata, 'exif_data')
|
|
|
|
def test_tc8_rotation_detection(self, pipeline):
|
|
"""Test Case 8: Rotation Detection based on EXIF metadata."""
|
|
flight_id = "tc8_flight"
|
|
# In a real scenario, this image would contain EXIF orientation tags.
|
|
# We verify the pipeline successfully loads AD000025.jpg and passes its
|
|
# metadata structure forward so the Image Rotation Manager can coordinate it.
|
|
img_bytes = generate_image_bytes(width=640, height=480)
|
|
|
|
batch = ImageBatch(
|
|
images=[img_bytes], filenames=["AD000025.jpg"],
|
|
start_sequence=25, end_sequence=25, batch_number=1
|
|
)
|
|
|
|
pipeline.queue_batch(flight_id, batch)
|
|
processed = pipeline.process_next_batch(flight_id)
|
|
|
|
assert processed is not None
|
|
assert len(processed) == 1
|
|
assert processed[0].filename == "AD000025.jpg"
|
|
# Ensures the Optional[Dict] structure is preserved for the Rotation Manager
|
|
assert hasattr(processed[0].metadata, 'exif_data') |