Files
gps-denied-onboard/simulate_flight.py
Denys Zaitsev d7e1066c60 Initial commit
2026-04-03 23:25:54 +03:00

93 lines
3.2 KiB
Python

import os
import time
import re
import httpx
API_BASE = "http://localhost:8000/api/v1/flights"
DATA_DIR = "docs/00_problem/input_data"
BATCH_SIZE = 10
# --- Configuration from data_parameters.md ---
CAMERA_PARAMS = {
"focal_length_mm": 25.0,
"sensor_width_mm": 23.5,
"resolution": {"width": 6252, "height": 4168}
}
ALTITUDE_M = 400.0
# IMPORTANT: Replace these with the actual starting coordinates from your CSV
START_LAT = 48.275292
START_LON = 37.385220
def get_sequence_from_filename(filename: str) -> int:
"""Extracts the integer sequence from filenames like AD000001.jpg"""
match = re.search(r'AD(\d{6})\.jpg', filename, re.IGNORECASE)
return int(match.group(1)) if match else 0
def main():
print("==================================================")
print("AURA-GEOFUSE: Real Data Flight Simulation")
print("==================================================")
# 1. Create the Flight
print("\n[1/4] Initializing Flight...")
flight_payload = {
"name": "Real Data Test Flight",
"start_gps": {"lat": START_LAT, "lon": START_LON},
"altitude": ALTITUDE_M,
"camera_params": CAMERA_PARAMS
}
with httpx.Client(timeout=30.0) as client:
resp = client.post(API_BASE, json=flight_payload)
resp.raise_for_status()
flight_id = resp.json()["flight_id"]
print(f"✅ Flight created successfully! ID: {flight_id}")
# 2. Gather and sort images
print(f"\n[2/4] Scanning '{DATA_DIR}' for images...")
all_files = [f for f in os.listdir(DATA_DIR) if f.lower().endswith(".jpg")]
all_files.sort(key=get_sequence_from_filename)
if not all_files:
print(f"❌ No .jpg files found in {DATA_DIR}!")
return
print(f"Found {len(all_files)} images. Starting batch upload...")
# 3. Upload in batches
print("\n[3/4] Uploading Image Batches...")
batch_num = 1
for i in range(0, len(all_files), BATCH_SIZE):
batch_files = all_files[i:i+BATCH_SIZE]
start_seq = get_sequence_from_filename(batch_files[0])
end_seq = get_sequence_from_filename(batch_files[-1])
# Prepare multipart form data
files_payload = []
file_handles = []
for fname in batch_files:
fpath = os.path.join(DATA_DIR, fname)
fh = open(fpath, "rb")
file_handles.append(fh)
files_payload.append(("images", (fname, fh, "image/jpeg")))
data_payload = {
"start_sequence": start_seq,
"end_sequence": end_seq,
"batch_number": batch_num
}
print(f" -> Sending Batch {batch_num} (Seq {start_seq} to {end_seq})...")
batch_resp = client.post(f"{API_BASE}/{flight_id}/images/batch", data=data_payload, files=files_payload)
for fh in file_handles:
fh.close()
batch_resp.raise_for_status()
batch_num += 1
print("\n[4/4] ✅ All images uploaded! Check the server logs for processing status.")
if __name__ == "__main__":
main()