import pytest import cv2 import os import numpy as np import threading import tempfile import shutil from f05_image_input_pipeline import ImageInputPipeline, ImageBatch @pytest.fixture def pipeline(): temp_dir = tempfile.mkdtemp() p = ImageInputPipeline(storage_dir=temp_dir) yield p shutil.rmtree(temp_dir) @pytest.fixture def valid_image_bytes(): # Create a valid 640x480 JPEG image byte string to pass dimension validation img = np.zeros((480, 640, 3), dtype=np.uint8) success, encoded = cv2.imencode('.jpg', img) assert success return encoded.tobytes() def create_valid_batch(start_seq: int, count: int, image_bytes: bytes, batch_num: int = 1) -> ImageBatch: return ImageBatch( images=[image_bytes for _ in range(count)], filenames=[f"AD{seq:06d}.jpg" for seq in range(start_seq, start_seq + count)], start_sequence=start_seq, end_sequence=start_seq + count - 1, batch_number=batch_num ) class TestImageInputPipeline: # --- Validation Tests --- def test_validate_batch_size_min(self, pipeline, valid_image_bytes): # 9 images -> invalid (min 10) batch = create_valid_batch(1, 9, valid_image_bytes) val = pipeline.validate_batch(batch) assert val.valid is False assert "below minimum 10" in val.errors[0] def test_validate_batch_size_max(self, pipeline, valid_image_bytes): # 51 images -> invalid (max 50) batch = create_valid_batch(1, 51, valid_image_bytes) assert pipeline.validate_batch(batch).valid is False def test_validate_batch_naming_convention(self, pipeline, valid_image_bytes): batch = create_valid_batch(1, 10, valid_image_bytes) assert pipeline.validate_batch(batch).valid is True def test_validate_batch_invalid_format(self, pipeline, valid_image_bytes): batch = create_valid_batch(1, 10, valid_image_bytes) batch.filenames[0] = "IMG_0001.jpg" val = pipeline.validate_batch(batch) assert val.valid is False assert "Invalid naming" in val.errors[0] def test_validate_batch_file_size(self, pipeline, valid_image_bytes): batch = create_valid_batch(1, 10, valid_image_bytes) # Mock one image being > 10MB batch.images[0] = b"0" * (11 * 1024 * 1024) val = pipeline.validate_batch(batch) assert val.valid is False assert "Size > 10MB" in val.errors[0] # --- Queue & Internal Helper Tests --- def test_queue_batch_valid(self, pipeline, valid_image_bytes): batch = create_valid_batch(1, 10, valid_image_bytes) assert pipeline.queue_batch("flight_1", batch) is True assert pipeline._get_queue_capacity("flight_1") == 9 # Max 10 - 1 def test_queue_batch_sequence_gap(self, pipeline, valid_image_bytes): batch1 = create_valid_batch(1, 10, valid_image_bytes) pipeline.queue_batch("flight_gap", batch1) # Missing batch 11-20, queueing 21-30 directly batch3 = create_valid_batch(21, 10, valid_image_bytes) assert pipeline.queue_batch("flight_gap", batch3) is False def test_queue_batch_queue_full(self, pipeline, valid_image_bytes): # Fill queue to capacity (10 batches) for i in range(10): batch = create_valid_batch((i * 10) + 1, 10, valid_image_bytes) assert pipeline.queue_batch("flight_full", batch) is True # 11th batch should be rejected batch11 = create_valid_batch(101, 10, valid_image_bytes) assert pipeline.queue_batch("flight_full", batch11) is False assert pipeline._get_queue_capacity("flight_full") == 0 # --- Processing Tests --- def test_process_next_batch_dequeue(self, pipeline, valid_image_bytes): batch = create_valid_batch(1, 10, valid_image_bytes) pipeline.queue_batch("flight_proc", batch) processed = pipeline.process_next_batch("flight_proc") assert processed is not None assert len(processed.images) == 10 assert processed.start_sequence == 1 assert pipeline._get_queue_capacity("flight_proc") == 10 def test_process_next_batch_empty_queue(self, pipeline): assert pipeline.process_next_batch("flight_empty") is None def test_process_next_batch_corrupted_image(self, pipeline, valid_image_bytes): batch = create_valid_batch(1, 10, valid_image_bytes) # Corrupt 2 images batch.images[2] = b"corrupted_nonsense" batch.images[5] = b"corrupted_nonsense" # H08 format validation catches this upfront, preventing the queue assert pipeline.queue_batch("flight_corrupt", batch) is False def test_process_next_batch_metadata_extraction(self, pipeline, valid_image_bytes): batch = create_valid_batch(1, 10, valid_image_bytes) pipeline.queue_batch("flight_meta", batch) processed = pipeline.process_next_batch("flight_meta") meta = processed.images[0].metadata assert meta.sequence == 1 assert meta.dimensions == (640, 480) assert meta.file_size == len(valid_image_bytes) def test_fifo_order(self, pipeline, valid_image_bytes): batch1 = create_valid_batch(1, 10, valid_image_bytes, batch_num=1) batch2 = create_valid_batch(11, 10, valid_image_bytes, batch_num=2) pipeline.queue_batch("flight_fifo", batch1) pipeline.queue_batch("flight_fifo", batch2) p1 = pipeline.process_next_batch("flight_fifo") p2 = pipeline.process_next_batch("flight_fifo") assert p1.start_sequence == 1 assert p2.start_sequence == 11 # --- Storage & Retrieval Tests --- def test_store_images_success(self, pipeline, valid_image_bytes): batch = create_valid_batch(1, 10, valid_image_bytes) processed = pipeline._decode_images("flight_store", batch) assert pipeline.store_images("flight_store", processed) is True def test_store_images_creates_directory(self, pipeline, valid_image_bytes): batch = create_valid_batch(1, 10, valid_image_bytes) processed = pipeline._decode_images("flight_dir", batch) pipeline.store_images("flight_dir", processed) assert os.path.exists(os.path.join(pipeline.storage_dir, "flight_dir")) def test_store_images_updates_metadata(self, pipeline, valid_image_bytes): batch = create_valid_batch(1, 10, valid_image_bytes) processed = pipeline._decode_images("flight_meta", batch) pipeline.store_images("flight_meta", processed) index_path = os.path.join(pipeline.storage_dir, "flight_meta", "metadata.json") assert os.path.exists(index_path) def test_store_images_disk_full(self, pipeline, valid_image_bytes, monkeypatch): batch = create_valid_batch(1, 10, valid_image_bytes) processed = pipeline._decode_images("flight_full", batch) # Mock cv2.imwrite to fail monkeypatch.setattr("cv2.imwrite", lambda *args, **kwargs: False) assert pipeline.store_images("flight_full", processed) is False def test_get_next_image_sequential(self, pipeline, valid_image_bytes): batch = create_valid_batch(1, 10, valid_image_bytes) processed = pipeline._decode_images("flight_seq", batch) pipeline.store_images("flight_seq", processed) img1 = pipeline.get_next_image("flight_seq") img2 = pipeline.get_next_image("flight_seq") assert img1.sequence == 1 assert img2.sequence == 2 def test_get_next_image_increments_counter(self, pipeline, valid_image_bytes): batch = create_valid_batch(1, 10, valid_image_bytes) processed = pipeline._decode_images("flight_inc", batch) pipeline.store_images("flight_inc", processed) assert pipeline._get_sequence_tracker("flight_inc") == 1 pipeline.get_next_image("flight_inc") assert pipeline._get_sequence_tracker("flight_inc") == 2 def test_get_next_image_end_of_sequence(self, pipeline, valid_image_bytes): batch = create_valid_batch(1, 2, valid_image_bytes) processed = pipeline._decode_images("flight_end", batch) pipeline.store_images("flight_end", processed) pipeline.get_next_image("flight_end") pipeline.get_next_image("flight_end") assert pipeline.get_next_image("flight_end") is None def test_get_next_image_missing_file(self, pipeline): assert pipeline.get_next_image("flight_missing") is None def test_get_image_by_sequence_valid(self, pipeline, valid_image_bytes): batch = create_valid_batch(100, 10, valid_image_bytes) processed = pipeline._decode_images("flight_by_seq", batch) pipeline.store_images("flight_by_seq", processed) img = pipeline.get_image_by_sequence("flight_by_seq", 105) assert img is not None assert img.sequence == 105 def test_get_image_by_sequence_invalid(self, pipeline, valid_image_bytes): batch = create_valid_batch(1, 10, valid_image_bytes) processed = pipeline._decode_images("flight_inv_seq", batch) pipeline.store_images("flight_inv_seq", processed) assert pipeline.get_image_by_sequence("flight_inv_seq", 999) is None def test_get_image_by_sequence_constructs_filename(self, pipeline): assert pipeline._construct_filename(101) == "AD000101.jpg" def test_get_image_metadata_fast(self, pipeline, valid_image_bytes): batch = create_valid_batch(1, 10, valid_image_bytes) processed = pipeline._decode_images("flight_fast_meta", batch) pipeline.store_images("flight_fast_meta", processed) meta = pipeline.get_image_metadata("flight_fast_meta", 5) assert meta is not None assert meta.sequence == 5 def test_get_processing_status_counts(self, pipeline, valid_image_bytes): pipeline.queue_batch("flight_status", create_valid_batch(1, 10, valid_image_bytes)) pipeline.process_next_batch("flight_status") status = pipeline.get_processing_status("flight_status") assert status.total_images == 10 assert status.processed_images == 10 def test_get_processing_status_current_sequence(self, pipeline, valid_image_bytes): pipeline.queue_batch("flight_curr_seq", create_valid_batch(1, 10, valid_image_bytes)) pipeline.process_next_batch("flight_curr_seq") pipeline.get_next_image("flight_curr_seq") pipeline.get_next_image("flight_curr_seq") status = pipeline.get_processing_status("flight_curr_seq") assert status.current_sequence == 3 def test_get_processing_status_queued_batches(self, pipeline, valid_image_bytes): pipeline.queue_batch("flight_q_batch", create_valid_batch(1, 10, valid_image_bytes)) pipeline.queue_batch("flight_q_batch", create_valid_batch(11, 10, valid_image_bytes)) status = pipeline.get_processing_status("flight_q_batch") assert status.queued_batches == 2 def test_get_processing_status_rate(self, pipeline, valid_image_bytes): pipeline.queue_batch("flight_rate", create_valid_batch(1, 10, valid_image_bytes)) pipeline.process_next_batch("flight_rate") status = pipeline.get_processing_status("flight_rate") assert status.processing_rate > 0.0