mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 08:56:37 +00:00
260 lines
11 KiB
Python
260 lines
11 KiB
Python
import pytest
|
|
import cv2
|
|
import os
|
|
import numpy as np
|
|
import threading
|
|
import tempfile
|
|
import shutil
|
|
|
|
from f05_image_input_pipeline import ImageInputPipeline, ImageBatch
|
|
|
|
@pytest.fixture
|
|
def pipeline():
|
|
temp_dir = tempfile.mkdtemp()
|
|
p = ImageInputPipeline(storage_dir=temp_dir)
|
|
yield p
|
|
shutil.rmtree(temp_dir)
|
|
|
|
@pytest.fixture
|
|
def valid_image_bytes():
|
|
# Create a valid 640x480 JPEG image byte string to pass dimension validation
|
|
img = np.zeros((480, 640, 3), dtype=np.uint8)
|
|
success, encoded = cv2.imencode('.jpg', img)
|
|
assert success
|
|
return encoded.tobytes()
|
|
|
|
def create_valid_batch(start_seq: int, count: int, image_bytes: bytes, batch_num: int = 1) -> ImageBatch:
|
|
return ImageBatch(
|
|
images=[image_bytes for _ in range(count)],
|
|
filenames=[f"AD{seq:06d}.jpg" for seq in range(start_seq, start_seq + count)],
|
|
start_sequence=start_seq,
|
|
end_sequence=start_seq + count - 1,
|
|
batch_number=batch_num
|
|
)
|
|
|
|
class TestImageInputPipeline:
|
|
# --- Validation Tests ---
|
|
|
|
def test_validate_batch_size_min(self, pipeline, valid_image_bytes):
|
|
# 9 images -> invalid (min 10)
|
|
batch = create_valid_batch(1, 9, valid_image_bytes)
|
|
val = pipeline.validate_batch(batch)
|
|
assert val.valid is False
|
|
assert "below minimum 10" in val.errors[0]
|
|
|
|
def test_validate_batch_size_max(self, pipeline, valid_image_bytes):
|
|
# 51 images -> invalid (max 50)
|
|
batch = create_valid_batch(1, 51, valid_image_bytes)
|
|
assert pipeline.validate_batch(batch).valid is False
|
|
|
|
def test_validate_batch_naming_convention(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(1, 10, valid_image_bytes)
|
|
assert pipeline.validate_batch(batch).valid is True
|
|
|
|
def test_validate_batch_invalid_format(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(1, 10, valid_image_bytes)
|
|
batch.filenames[0] = "IMG_0001.jpg"
|
|
val = pipeline.validate_batch(batch)
|
|
assert val.valid is False
|
|
assert "Invalid naming" in val.errors[0]
|
|
|
|
def test_validate_batch_file_size(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(1, 10, valid_image_bytes)
|
|
# Mock one image being > 10MB
|
|
batch.images[0] = b"0" * (11 * 1024 * 1024)
|
|
val = pipeline.validate_batch(batch)
|
|
assert val.valid is False
|
|
assert "Size > 10MB" in val.errors[0]
|
|
|
|
# --- Queue & Internal Helper Tests ---
|
|
|
|
def test_queue_batch_valid(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(1, 10, valid_image_bytes)
|
|
assert pipeline.queue_batch("flight_1", batch) is True
|
|
assert pipeline._get_queue_capacity("flight_1") == 9 # Max 10 - 1
|
|
|
|
def test_queue_batch_sequence_gap(self, pipeline, valid_image_bytes):
|
|
batch1 = create_valid_batch(1, 10, valid_image_bytes)
|
|
pipeline.queue_batch("flight_gap", batch1)
|
|
|
|
# Missing batch 11-20, queueing 21-30 directly
|
|
batch3 = create_valid_batch(21, 10, valid_image_bytes)
|
|
assert pipeline.queue_batch("flight_gap", batch3) is False
|
|
|
|
def test_queue_batch_queue_full(self, pipeline, valid_image_bytes):
|
|
# Fill queue to capacity (10 batches)
|
|
for i in range(10):
|
|
batch = create_valid_batch((i * 10) + 1, 10, valid_image_bytes)
|
|
assert pipeline.queue_batch("flight_full", batch) is True
|
|
|
|
# 11th batch should be rejected
|
|
batch11 = create_valid_batch(101, 10, valid_image_bytes)
|
|
assert pipeline.queue_batch("flight_full", batch11) is False
|
|
assert pipeline._get_queue_capacity("flight_full") == 0
|
|
|
|
# --- Processing Tests ---
|
|
|
|
def test_process_next_batch_dequeue(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(1, 10, valid_image_bytes)
|
|
pipeline.queue_batch("flight_proc", batch)
|
|
|
|
processed = pipeline.process_next_batch("flight_proc")
|
|
assert processed is not None
|
|
assert len(processed.images) == 10
|
|
assert processed.start_sequence == 1
|
|
assert pipeline._get_queue_capacity("flight_proc") == 10
|
|
|
|
def test_process_next_batch_empty_queue(self, pipeline):
|
|
assert pipeline.process_next_batch("flight_empty") is None
|
|
|
|
def test_process_next_batch_corrupted_image(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(1, 10, valid_image_bytes)
|
|
# Corrupt 2 images
|
|
batch.images[2] = b"corrupted_nonsense"
|
|
batch.images[5] = b"corrupted_nonsense"
|
|
|
|
# H08 format validation catches this upfront, preventing the queue
|
|
assert pipeline.queue_batch("flight_corrupt", batch) is False
|
|
|
|
def test_process_next_batch_metadata_extraction(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(1, 10, valid_image_bytes)
|
|
pipeline.queue_batch("flight_meta", batch)
|
|
|
|
processed = pipeline.process_next_batch("flight_meta")
|
|
meta = processed.images[0].metadata
|
|
assert meta.sequence == 1
|
|
assert meta.dimensions == (640, 480)
|
|
assert meta.file_size == len(valid_image_bytes)
|
|
|
|
def test_fifo_order(self, pipeline, valid_image_bytes):
|
|
batch1 = create_valid_batch(1, 10, valid_image_bytes, batch_num=1)
|
|
batch2 = create_valid_batch(11, 10, valid_image_bytes, batch_num=2)
|
|
|
|
pipeline.queue_batch("flight_fifo", batch1)
|
|
pipeline.queue_batch("flight_fifo", batch2)
|
|
|
|
p1 = pipeline.process_next_batch("flight_fifo")
|
|
p2 = pipeline.process_next_batch("flight_fifo")
|
|
|
|
assert p1.start_sequence == 1
|
|
assert p2.start_sequence == 11
|
|
|
|
# --- Storage & Retrieval Tests ---
|
|
|
|
def test_store_images_success(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(1, 10, valid_image_bytes)
|
|
processed = pipeline._decode_images("flight_store", batch)
|
|
assert pipeline.store_images("flight_store", processed) is True
|
|
|
|
def test_store_images_creates_directory(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(1, 10, valid_image_bytes)
|
|
processed = pipeline._decode_images("flight_dir", batch)
|
|
pipeline.store_images("flight_dir", processed)
|
|
assert os.path.exists(os.path.join(pipeline.storage_dir, "flight_dir"))
|
|
|
|
def test_store_images_updates_metadata(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(1, 10, valid_image_bytes)
|
|
processed = pipeline._decode_images("flight_meta", batch)
|
|
pipeline.store_images("flight_meta", processed)
|
|
|
|
index_path = os.path.join(pipeline.storage_dir, "flight_meta", "metadata.json")
|
|
assert os.path.exists(index_path)
|
|
|
|
def test_store_images_disk_full(self, pipeline, valid_image_bytes, monkeypatch):
|
|
batch = create_valid_batch(1, 10, valid_image_bytes)
|
|
processed = pipeline._decode_images("flight_full", batch)
|
|
|
|
# Mock cv2.imwrite to fail
|
|
monkeypatch.setattr("cv2.imwrite", lambda *args, **kwargs: False)
|
|
assert pipeline.store_images("flight_full", processed) is False
|
|
|
|
def test_get_next_image_sequential(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(1, 10, valid_image_bytes)
|
|
processed = pipeline._decode_images("flight_seq", batch)
|
|
pipeline.store_images("flight_seq", processed)
|
|
|
|
img1 = pipeline.get_next_image("flight_seq")
|
|
img2 = pipeline.get_next_image("flight_seq")
|
|
|
|
assert img1.sequence == 1
|
|
assert img2.sequence == 2
|
|
|
|
def test_get_next_image_increments_counter(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(1, 10, valid_image_bytes)
|
|
processed = pipeline._decode_images("flight_inc", batch)
|
|
pipeline.store_images("flight_inc", processed)
|
|
|
|
assert pipeline._get_sequence_tracker("flight_inc") == 1
|
|
pipeline.get_next_image("flight_inc")
|
|
assert pipeline._get_sequence_tracker("flight_inc") == 2
|
|
|
|
def test_get_next_image_end_of_sequence(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(1, 2, valid_image_bytes)
|
|
processed = pipeline._decode_images("flight_end", batch)
|
|
pipeline.store_images("flight_end", processed)
|
|
|
|
pipeline.get_next_image("flight_end")
|
|
pipeline.get_next_image("flight_end")
|
|
assert pipeline.get_next_image("flight_end") is None
|
|
|
|
def test_get_next_image_missing_file(self, pipeline):
|
|
assert pipeline.get_next_image("flight_missing") is None
|
|
|
|
def test_get_image_by_sequence_valid(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(100, 10, valid_image_bytes)
|
|
processed = pipeline._decode_images("flight_by_seq", batch)
|
|
pipeline.store_images("flight_by_seq", processed)
|
|
|
|
img = pipeline.get_image_by_sequence("flight_by_seq", 105)
|
|
assert img is not None
|
|
assert img.sequence == 105
|
|
|
|
def test_get_image_by_sequence_invalid(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(1, 10, valid_image_bytes)
|
|
processed = pipeline._decode_images("flight_inv_seq", batch)
|
|
pipeline.store_images("flight_inv_seq", processed)
|
|
|
|
assert pipeline.get_image_by_sequence("flight_inv_seq", 999) is None
|
|
|
|
def test_get_image_by_sequence_constructs_filename(self, pipeline):
|
|
assert pipeline._construct_filename(101) == "AD000101.jpg"
|
|
|
|
def test_get_image_metadata_fast(self, pipeline, valid_image_bytes):
|
|
batch = create_valid_batch(1, 10, valid_image_bytes)
|
|
processed = pipeline._decode_images("flight_fast_meta", batch)
|
|
pipeline.store_images("flight_fast_meta", processed)
|
|
|
|
meta = pipeline.get_image_metadata("flight_fast_meta", 5)
|
|
assert meta is not None
|
|
assert meta.sequence == 5
|
|
|
|
def test_get_processing_status_counts(self, pipeline, valid_image_bytes):
|
|
pipeline.queue_batch("flight_status", create_valid_batch(1, 10, valid_image_bytes))
|
|
pipeline.process_next_batch("flight_status")
|
|
|
|
status = pipeline.get_processing_status("flight_status")
|
|
assert status.total_images == 10
|
|
assert status.processed_images == 10
|
|
|
|
def test_get_processing_status_current_sequence(self, pipeline, valid_image_bytes):
|
|
pipeline.queue_batch("flight_curr_seq", create_valid_batch(1, 10, valid_image_bytes))
|
|
pipeline.process_next_batch("flight_curr_seq")
|
|
pipeline.get_next_image("flight_curr_seq")
|
|
pipeline.get_next_image("flight_curr_seq")
|
|
|
|
status = pipeline.get_processing_status("flight_curr_seq")
|
|
assert status.current_sequence == 3
|
|
|
|
def test_get_processing_status_queued_batches(self, pipeline, valid_image_bytes):
|
|
pipeline.queue_batch("flight_q_batch", create_valid_batch(1, 10, valid_image_bytes))
|
|
pipeline.queue_batch("flight_q_batch", create_valid_batch(11, 10, valid_image_bytes))
|
|
|
|
status = pipeline.get_processing_status("flight_q_batch")
|
|
assert status.queued_batches == 2
|
|
|
|
def test_get_processing_status_rate(self, pipeline, valid_image_bytes):
|
|
pipeline.queue_batch("flight_rate", create_valid_batch(1, 10, valid_image_bytes))
|
|
pipeline.process_next_batch("flight_rate")
|
|
|
|
status = pipeline.get_processing_status("flight_rate")
|
|
assert status.processing_rate > 0.0 |