mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 08:36:36 +00:00
260 lines
10 KiB
Python
260 lines
10 KiB
Python
import pytest
|
|
import time
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
from unittest.mock import Mock
|
|
|
|
from f02_1_flight_lifecycle_manager import Flight, GPSPoint, CameraParameters, Waypoint, FlightState
|
|
from f03_flight_database import FlightDatabase, ChunkHandle, FrameResult
|
|
|
|
@pytest.fixture
|
|
def db():
|
|
"""Provides an isolated, in-memory SQLite database instance."""
|
|
return FlightDatabase("sqlite:///:memory:")
|
|
|
|
@pytest.fixture
|
|
def sample_flight():
|
|
return Flight(
|
|
flight_id="f_abc123",
|
|
flight_name="Test DB Mission",
|
|
start_gps=GPSPoint(lat=48.0, lon=37.0),
|
|
altitude_m=400.0,
|
|
camera_params=CameraParameters(focal_length_mm=25, sensor_width_mm=23.5, resolution={"width": 6000, "height": 4000})
|
|
)
|
|
|
|
class TestFlightDatabase:
|
|
|
|
# --- Feature 03.01: CRUD Operations & Transactions ---
|
|
|
|
def test_insert_and_get_flight(self, db, sample_flight):
|
|
flight_id = db.insert_flight(sample_flight)
|
|
assert flight_id == "f_abc123"
|
|
|
|
fetched = db.get_flight_by_id(flight_id)
|
|
assert fetched is not None
|
|
assert fetched.flight_name == "Test DB Mission"
|
|
assert fetched.start_gps.lat == 48.0
|
|
|
|
def test_internal_helpers(self, db, sample_flight):
|
|
db.insert_flight(sample_flight)
|
|
|
|
# Test _execute_with_retry
|
|
mock_op = Mock(side_effect=[Exception("Transient"), "Success"])
|
|
result = db._execute_with_retry(mock_op, retries=3)
|
|
assert result == "Success"
|
|
assert mock_op.call_count == 2
|
|
|
|
# Test query filters
|
|
db.save_flight_state(FlightState(flight_id=sample_flight.flight_id, state="active"))
|
|
flights = db.query_flights({"status": "active", "name": "Test%"}, limit=10)
|
|
assert len(flights) == 1
|
|
flights_empty = db.query_flights({"status": "completed"}, limit=10)
|
|
assert len(flights_empty) == 0
|
|
|
|
def test_insert_duplicate_flight_raises_error(self, db, sample_flight):
|
|
db.insert_flight(sample_flight)
|
|
with pytest.raises(ValueError, match="Duplicate flight or integrity error"):
|
|
db.insert_flight(sample_flight)
|
|
|
|
def test_update_flight(self, db, sample_flight):
|
|
db.insert_flight(sample_flight)
|
|
sample_flight.flight_name = "Updated Mission"
|
|
|
|
success = db.update_flight(sample_flight)
|
|
assert success is True
|
|
|
|
fetched = db.get_flight_by_id("f_abc123")
|
|
assert fetched.flight_name == "Updated Mission"
|
|
|
|
def test_execute_transaction_commit(self, db, sample_flight):
|
|
db.insert_flight(sample_flight)
|
|
|
|
def update_op1():
|
|
db.insert_waypoint("f_abc123", Waypoint(id="w1", lat=48.1, lon=37.1, confidence=0.9, timestamp=datetime.utcnow()))
|
|
def update_op2():
|
|
db.insert_waypoint("f_abc123", Waypoint(id="w2", lat=48.2, lon=37.2, confidence=0.9, timestamp=datetime.utcnow()))
|
|
|
|
success = db.execute_transaction([update_op1, update_op2])
|
|
assert success is True
|
|
assert len(db.get_waypoints("f_abc123")) == 2
|
|
|
|
def test_execute_transaction_rollback(self, db, sample_flight):
|
|
db.insert_flight(sample_flight)
|
|
|
|
def update_op1():
|
|
db.insert_waypoint("f_abc123", Waypoint(id="w3", lat=48.1, lon=37.1, confidence=0.9, timestamp=datetime.utcnow()))
|
|
def fail_op2():
|
|
raise Exception("Simulated Failure")
|
|
|
|
success = db.execute_transaction([update_op1, fail_op2])
|
|
assert success is False
|
|
# Verify atomicity: w3 should NOT be in the database because of the rollback
|
|
assert len(db.get_waypoints("f_abc123")) == 0
|
|
|
|
def test_batch_update_waypoints(self, db, sample_flight):
|
|
db.insert_flight(sample_flight)
|
|
wp1 = Waypoint(id="w1", lat=48.1, lon=37.1, confidence=0.9, timestamp=datetime.utcnow())
|
|
wp2 = Waypoint(id="w2", lat=48.2, lon=37.2, confidence=0.9, timestamp=datetime.utcnow())
|
|
db.insert_waypoint("f_abc123", wp1)
|
|
db.insert_waypoint("f_abc123", wp2)
|
|
|
|
# Modify waypoints
|
|
wp1.lat = 50.0
|
|
wp2.lon = 40.0
|
|
|
|
res = db.batch_update_waypoints("f_abc123", [wp1, wp2])
|
|
assert res.success is True
|
|
assert res.updated_count == 2
|
|
|
|
wps = db.get_waypoints("f_abc123")
|
|
assert wps[0].lat == 50.0
|
|
assert wps[1].lon == 40.0
|
|
|
|
def test_delete_flight_cascades(self, db, sample_flight):
|
|
db.insert_flight(sample_flight)
|
|
db.insert_waypoint("f_abc123", Waypoint(id="w1", lat=48.1, lon=37.1, confidence=0.9, timestamp=datetime.utcnow()))
|
|
db.save_flight_state(FlightState(flight_id="f_abc123", state="processing"))
|
|
|
|
assert db.delete_flight("f_abc123") is True
|
|
assert db.get_flight_by_id("f_abc123") is None
|
|
assert len(db.get_waypoints("f_abc123")) == 0
|
|
assert db.load_flight_state("f_abc123") is None
|
|
|
|
def test_high_frequency_waypoint_updates(self, db, sample_flight):
|
|
db.insert_flight(sample_flight)
|
|
wp = Waypoint(id="w1", lat=48.1, lon=37.1, confidence=0.9, timestamp=datetime.utcnow())
|
|
db.insert_waypoint("f_abc123", wp)
|
|
|
|
# Simulate fast updates
|
|
for i in range(10):
|
|
wp.lat = 48.1 + (i * 0.001)
|
|
db.update_waypoint("f_abc123", "w1", wp)
|
|
|
|
# Verify data isn't corrupted and the record still exists
|
|
final_wp = db.get_waypoints("f_abc123")
|
|
assert len(final_wp) == 1
|
|
|
|
# --- Feature 03.02: Processing State Persistence ---
|
|
|
|
def test_flight_state_persistence(self, db, sample_flight):
|
|
db.insert_flight(sample_flight)
|
|
state = FlightState(flight_id="f_abc123", state="blocked", processed_images=15)
|
|
|
|
assert db.save_flight_state(state) is True
|
|
loaded = db.load_flight_state("f_abc123")
|
|
assert loaded is not None
|
|
assert loaded.state == "blocked"
|
|
assert loaded.processed_images == 15
|
|
|
|
def test_heading_history_operations(self, db, sample_flight):
|
|
db.insert_flight(sample_flight)
|
|
ts = datetime.utcnow()
|
|
|
|
db.save_heading("f_abc123", 1, 90.5, ts)
|
|
db.save_heading("f_abc123", 2, 91.0, ts + timedelta(seconds=1))
|
|
|
|
latest = db.get_latest_heading("f_abc123")
|
|
assert latest == 91.0 # Verify ordering picks the most recent
|
|
|
|
def test_query_processing_history(self, db, sample_flight):
|
|
db.insert_flight(sample_flight)
|
|
db.save_flight_state(FlightState(flight_id="f_abc123", state="completed"))
|
|
|
|
# Add a second flight
|
|
sample_flight.flight_id = "f_xyz890"
|
|
db.insert_flight(sample_flight)
|
|
db.save_flight_state(FlightState(flight_id="f_xyz890", state="processing"))
|
|
|
|
# Test status filter
|
|
res = db.query_processing_history({"status": "completed"})
|
|
assert len(res) == 1
|
|
assert res[0].flight_id == "f_abc123"
|
|
|
|
# Test date filter
|
|
now = datetime.utcnow()
|
|
res = db.query_processing_history({"created_after": now - timedelta(hours=1)})
|
|
assert len(res) == 2
|
|
|
|
def test_save_and_get_frame_results(self, db, sample_flight):
|
|
db.insert_flight(sample_flight)
|
|
ts = datetime.utcnow()
|
|
|
|
fr1 = FrameResult(frame_id=1, gps_center=GPSPoint(lat=48.0, lon=37.0), heading=90.0, confidence=0.8, timestamp=ts)
|
|
fr2 = FrameResult(frame_id=2, gps_center=GPSPoint(lat=48.1, lon=37.1), heading=90.5, confidence=0.85, timestamp=ts)
|
|
|
|
assert db.save_frame_result("f_abc123", fr1) is True
|
|
assert db.save_frame_result("f_abc123", fr2) is True
|
|
|
|
# Test refinement update
|
|
fr1.refined = True
|
|
fr1.gps_center.lat = 48.05
|
|
assert db.save_frame_result("f_abc123", fr1) is True
|
|
|
|
results = db.get_frame_results("f_abc123")
|
|
assert len(results) == 2
|
|
assert results[0].refined is True
|
|
assert results[0].gps_center.lat == 48.05
|
|
assert results[1].refined is False
|
|
|
|
def test_get_heading_history(self, db, sample_flight):
|
|
db.insert_flight(sample_flight)
|
|
ts = datetime.utcnow()
|
|
|
|
for i in range(1, 21):
|
|
db.save_heading("f_abc123", i, 90.0 + i, ts + timedelta(seconds=i))
|
|
|
|
# Test limit
|
|
hist = db.get_heading_history("f_abc123", last_n=5)
|
|
assert len(hist) == 5
|
|
assert hist[0].frame_id == 20 # Descending order ensures frame 20 is first
|
|
assert hist[4].frame_id == 16
|
|
|
|
# --- Feature 03.03: Auxiliary Data Persistence ---
|
|
|
|
def test_image_metadata_operations(self, db, sample_flight):
|
|
db.insert_flight(sample_flight)
|
|
meta = {"original_name": "img_001.jpg", "size": 1024}
|
|
|
|
assert db.save_image_metadata("f_abc123", 1, "/path/img1.jpg", meta) is True
|
|
|
|
path = db.get_image_path("f_abc123", 1)
|
|
assert path == "/path/img1.jpg"
|
|
|
|
retrieved_meta = db.get_image_metadata("f_abc123", 1)
|
|
assert retrieved_meta["size"] == 1024
|
|
|
|
assert db.get_image_path("f_abc123", 999) is None
|
|
assert db.get_image_metadata("f_abc123", 999) is None
|
|
|
|
def test_chunk_state_persistence(self, db, sample_flight):
|
|
db.insert_flight(sample_flight)
|
|
|
|
chunk = ChunkHandle(
|
|
chunk_id="chunk_1",
|
|
start_frame_id=1,
|
|
end_frame_id=10,
|
|
frames=[1,2,3,4,5,6,7,8,9,10],
|
|
has_anchor=True,
|
|
anchor_gps=GPSPoint(lat=48.0, lon=37.0)
|
|
)
|
|
|
|
assert db.save_chunk_state("f_abc123", chunk) is True
|
|
|
|
loaded_chunks = db.load_chunk_states("f_abc123")
|
|
assert len(loaded_chunks) == 1
|
|
assert loaded_chunks[0].chunk_id == "chunk_1"
|
|
assert loaded_chunks[0].frames == [1,2,3,4,5,6,7,8,9,10]
|
|
assert loaded_chunks[0].anchor_gps.lat == 48.0
|
|
|
|
# Update chunk (e.g. merge operation)
|
|
chunk.frames.extend([11, 12])
|
|
assert db.save_chunk_state("f_abc123", chunk) is True
|
|
loaded_chunks = db.load_chunk_states("f_abc123")
|
|
assert loaded_chunks[0].frames == [1,2,3,4,5,6,7,8,9,10,11,12]
|
|
|
|
# Delete chunk
|
|
assert db.delete_chunk_state("f_abc123", "chunk_1") is True
|
|
assert len(db.load_chunk_states("f_abc123")) == 0
|
|
|
|
# Delete non-existent
|
|
assert db.delete_chunk_state("f_abc123", "chunk_1") is False |