Files
gps-denied-onboard/test_f03_flight_database.py
Denys Zaitsev d7e1066c60 Initial commit
2026-04-03 23:25:54 +03:00

260 lines
10 KiB
Python

import pytest
import time
import threading
from datetime import datetime, timedelta
from unittest.mock import Mock
from f02_1_flight_lifecycle_manager import Flight, GPSPoint, CameraParameters, Waypoint, FlightState
from f03_flight_database import FlightDatabase, ChunkHandle, FrameResult
@pytest.fixture
def db():
"""Provides an isolated, in-memory SQLite database instance."""
return FlightDatabase("sqlite:///:memory:")
@pytest.fixture
def sample_flight():
return Flight(
flight_id="f_abc123",
flight_name="Test DB Mission",
start_gps=GPSPoint(lat=48.0, lon=37.0),
altitude_m=400.0,
camera_params=CameraParameters(focal_length_mm=25, sensor_width_mm=23.5, resolution={"width": 6000, "height": 4000})
)
class TestFlightDatabase:
# --- Feature 03.01: CRUD Operations & Transactions ---
def test_insert_and_get_flight(self, db, sample_flight):
flight_id = db.insert_flight(sample_flight)
assert flight_id == "f_abc123"
fetched = db.get_flight_by_id(flight_id)
assert fetched is not None
assert fetched.flight_name == "Test DB Mission"
assert fetched.start_gps.lat == 48.0
def test_internal_helpers(self, db, sample_flight):
db.insert_flight(sample_flight)
# Test _execute_with_retry
mock_op = Mock(side_effect=[Exception("Transient"), "Success"])
result = db._execute_with_retry(mock_op, retries=3)
assert result == "Success"
assert mock_op.call_count == 2
# Test query filters
db.save_flight_state(FlightState(flight_id=sample_flight.flight_id, state="active"))
flights = db.query_flights({"status": "active", "name": "Test%"}, limit=10)
assert len(flights) == 1
flights_empty = db.query_flights({"status": "completed"}, limit=10)
assert len(flights_empty) == 0
def test_insert_duplicate_flight_raises_error(self, db, sample_flight):
db.insert_flight(sample_flight)
with pytest.raises(ValueError, match="Duplicate flight or integrity error"):
db.insert_flight(sample_flight)
def test_update_flight(self, db, sample_flight):
db.insert_flight(sample_flight)
sample_flight.flight_name = "Updated Mission"
success = db.update_flight(sample_flight)
assert success is True
fetched = db.get_flight_by_id("f_abc123")
assert fetched.flight_name == "Updated Mission"
def test_execute_transaction_commit(self, db, sample_flight):
db.insert_flight(sample_flight)
def update_op1():
db.insert_waypoint("f_abc123", Waypoint(id="w1", lat=48.1, lon=37.1, confidence=0.9, timestamp=datetime.utcnow()))
def update_op2():
db.insert_waypoint("f_abc123", Waypoint(id="w2", lat=48.2, lon=37.2, confidence=0.9, timestamp=datetime.utcnow()))
success = db.execute_transaction([update_op1, update_op2])
assert success is True
assert len(db.get_waypoints("f_abc123")) == 2
def test_execute_transaction_rollback(self, db, sample_flight):
db.insert_flight(sample_flight)
def update_op1():
db.insert_waypoint("f_abc123", Waypoint(id="w3", lat=48.1, lon=37.1, confidence=0.9, timestamp=datetime.utcnow()))
def fail_op2():
raise Exception("Simulated Failure")
success = db.execute_transaction([update_op1, fail_op2])
assert success is False
# Verify atomicity: w3 should NOT be in the database because of the rollback
assert len(db.get_waypoints("f_abc123")) == 0
def test_batch_update_waypoints(self, db, sample_flight):
db.insert_flight(sample_flight)
wp1 = Waypoint(id="w1", lat=48.1, lon=37.1, confidence=0.9, timestamp=datetime.utcnow())
wp2 = Waypoint(id="w2", lat=48.2, lon=37.2, confidence=0.9, timestamp=datetime.utcnow())
db.insert_waypoint("f_abc123", wp1)
db.insert_waypoint("f_abc123", wp2)
# Modify waypoints
wp1.lat = 50.0
wp2.lon = 40.0
res = db.batch_update_waypoints("f_abc123", [wp1, wp2])
assert res.success is True
assert res.updated_count == 2
wps = db.get_waypoints("f_abc123")
assert wps[0].lat == 50.0
assert wps[1].lon == 40.0
def test_delete_flight_cascades(self, db, sample_flight):
db.insert_flight(sample_flight)
db.insert_waypoint("f_abc123", Waypoint(id="w1", lat=48.1, lon=37.1, confidence=0.9, timestamp=datetime.utcnow()))
db.save_flight_state(FlightState(flight_id="f_abc123", state="processing"))
assert db.delete_flight("f_abc123") is True
assert db.get_flight_by_id("f_abc123") is None
assert len(db.get_waypoints("f_abc123")) == 0
assert db.load_flight_state("f_abc123") is None
def test_high_frequency_waypoint_updates(self, db, sample_flight):
db.insert_flight(sample_flight)
wp = Waypoint(id="w1", lat=48.1, lon=37.1, confidence=0.9, timestamp=datetime.utcnow())
db.insert_waypoint("f_abc123", wp)
# Simulate fast updates
for i in range(10):
wp.lat = 48.1 + (i * 0.001)
db.update_waypoint("f_abc123", "w1", wp)
# Verify data isn't corrupted and the record still exists
final_wp = db.get_waypoints("f_abc123")
assert len(final_wp) == 1
# --- Feature 03.02: Processing State Persistence ---
def test_flight_state_persistence(self, db, sample_flight):
db.insert_flight(sample_flight)
state = FlightState(flight_id="f_abc123", state="blocked", processed_images=15)
assert db.save_flight_state(state) is True
loaded = db.load_flight_state("f_abc123")
assert loaded is not None
assert loaded.state == "blocked"
assert loaded.processed_images == 15
def test_heading_history_operations(self, db, sample_flight):
db.insert_flight(sample_flight)
ts = datetime.utcnow()
db.save_heading("f_abc123", 1, 90.5, ts)
db.save_heading("f_abc123", 2, 91.0, ts + timedelta(seconds=1))
latest = db.get_latest_heading("f_abc123")
assert latest == 91.0 # Verify ordering picks the most recent
def test_query_processing_history(self, db, sample_flight):
db.insert_flight(sample_flight)
db.save_flight_state(FlightState(flight_id="f_abc123", state="completed"))
# Add a second flight
sample_flight.flight_id = "f_xyz890"
db.insert_flight(sample_flight)
db.save_flight_state(FlightState(flight_id="f_xyz890", state="processing"))
# Test status filter
res = db.query_processing_history({"status": "completed"})
assert len(res) == 1
assert res[0].flight_id == "f_abc123"
# Test date filter
now = datetime.utcnow()
res = db.query_processing_history({"created_after": now - timedelta(hours=1)})
assert len(res) == 2
def test_save_and_get_frame_results(self, db, sample_flight):
db.insert_flight(sample_flight)
ts = datetime.utcnow()
fr1 = FrameResult(frame_id=1, gps_center=GPSPoint(lat=48.0, lon=37.0), heading=90.0, confidence=0.8, timestamp=ts)
fr2 = FrameResult(frame_id=2, gps_center=GPSPoint(lat=48.1, lon=37.1), heading=90.5, confidence=0.85, timestamp=ts)
assert db.save_frame_result("f_abc123", fr1) is True
assert db.save_frame_result("f_abc123", fr2) is True
# Test refinement update
fr1.refined = True
fr1.gps_center.lat = 48.05
assert db.save_frame_result("f_abc123", fr1) is True
results = db.get_frame_results("f_abc123")
assert len(results) == 2
assert results[0].refined is True
assert results[0].gps_center.lat == 48.05
assert results[1].refined is False
def test_get_heading_history(self, db, sample_flight):
db.insert_flight(sample_flight)
ts = datetime.utcnow()
for i in range(1, 21):
db.save_heading("f_abc123", i, 90.0 + i, ts + timedelta(seconds=i))
# Test limit
hist = db.get_heading_history("f_abc123", last_n=5)
assert len(hist) == 5
assert hist[0].frame_id == 20 # Descending order ensures frame 20 is first
assert hist[4].frame_id == 16
# --- Feature 03.03: Auxiliary Data Persistence ---
def test_image_metadata_operations(self, db, sample_flight):
db.insert_flight(sample_flight)
meta = {"original_name": "img_001.jpg", "size": 1024}
assert db.save_image_metadata("f_abc123", 1, "/path/img1.jpg", meta) is True
path = db.get_image_path("f_abc123", 1)
assert path == "/path/img1.jpg"
retrieved_meta = db.get_image_metadata("f_abc123", 1)
assert retrieved_meta["size"] == 1024
assert db.get_image_path("f_abc123", 999) is None
assert db.get_image_metadata("f_abc123", 999) is None
def test_chunk_state_persistence(self, db, sample_flight):
db.insert_flight(sample_flight)
chunk = ChunkHandle(
chunk_id="chunk_1",
start_frame_id=1,
end_frame_id=10,
frames=[1,2,3,4,5,6,7,8,9,10],
has_anchor=True,
anchor_gps=GPSPoint(lat=48.0, lon=37.0)
)
assert db.save_chunk_state("f_abc123", chunk) is True
loaded_chunks = db.load_chunk_states("f_abc123")
assert len(loaded_chunks) == 1
assert loaded_chunks[0].chunk_id == "chunk_1"
assert loaded_chunks[0].frames == [1,2,3,4,5,6,7,8,9,10]
assert loaded_chunks[0].anchor_gps.lat == 48.0
# Update chunk (e.g. merge operation)
chunk.frames.extend([11, 12])
assert db.save_chunk_state("f_abc123", chunk) is True
loaded_chunks = db.load_chunk_states("f_abc123")
assert loaded_chunks[0].frames == [1,2,3,4,5,6,7,8,9,10,11,12]
# Delete chunk
assert db.delete_chunk_state("f_abc123", "chunk_1") is True
assert len(db.load_chunk_states("f_abc123")) == 0
# Delete non-existent
assert db.delete_chunk_state("f_abc123", "chunk_1") is False