Files
gps-denied-onboard/tests/test_pipeline.py
T

83 lines
2.5 KiB
Python

"""Tests for Image Input Pipeline (F05)."""
import asyncio
import cv2
import numpy as np
import pytest
from gps_denied.core.pipeline import ImageInputPipeline, QueueFullError, ValidationError
from gps_denied.schemas.image import ImageBatch
@pytest.fixture
def pipeline(tmp_path):
storage = str(tmp_path / "images")
return ImageInputPipeline(storage_dir=storage, max_queue_size=2)
def test_batch_validation(pipeline):
# Too few images
b1 = ImageBatch(images=[b"1", b"2"], filenames=["1.jpg", "2.jpg"], start_sequence=1, end_sequence=2, batch_number=1)
val = pipeline.validate_batch(b1)
assert not val.valid
assert "Batch is empty" in val.errors
# Let's mock a valid batch of 10 images
fake_imgs = [b"fake"] * 10
fake_names = [f"AD{i:06d}.jpg" for i in range(1, 11)]
b2 = ImageBatch(images=fake_imgs, filenames=fake_names, start_sequence=1, end_sequence=10, batch_number=1)
val2 = pipeline.validate_batch(b2)
assert val2.valid
@pytest.mark.asyncio
async def test_queue_and_process(pipeline):
flight_id = "test_f1"
# Create valid fake images
fake_img_np = np.zeros((10, 10, 3), dtype=np.uint8)
_, encoded = cv2.imencode(".jpg", fake_img_np)
fake_bytes = encoded.tobytes()
fake_imgs = [fake_bytes] * 10
fake_names = [f"AD{i:06d}.jpg" for i in range(1, 11)]
b = ImageBatch(images=fake_imgs, filenames=fake_names, start_sequence=1, end_sequence=10, batch_number=1)
pipeline.queue_batch(flight_id, b)
# Process
processed = await pipeline.process_next_batch(flight_id)
assert processed is not None
assert len(processed.images) == 10
assert processed.images[0].sequence == 1
assert processed.images[-1].sequence == 10
# Status
st = pipeline.get_processing_status(flight_id)
assert st.total_images == 10
assert st.processed_images == 10
# Sequential get
next_img = pipeline.get_next_image(flight_id)
assert next_img is not None
assert next_img.sequence == 1
# Second get
next_img2 = pipeline.get_next_image(flight_id)
assert next_img2 is not None
assert next_img2.sequence == 2
def test_queue_full(pipeline):
flight_id = "test_full"
fake_imgs = [b"fake"] * 10
fake_names = [f"AD{i:06d}.jpg" for i in range(1, 11)]
b = ImageBatch(images=fake_imgs, filenames=fake_names, start_sequence=1, end_sequence=10, batch_number=1)
pipeline.queue_batch(flight_id, b)
pipeline.queue_batch(flight_id, b)
with pytest.raises(QueueFullError):
pipeline.queue_batch(flight_id, b)