9.6 KiB
Result Manager
Interface Definition
Interface Name: IResultManager
Interface Methods
class IResultManager(ABC):
@abstractmethod
def update_frame_result(self, flight_id: str, frame_id: int, result: FrameResult) -> bool:
"""
Atomic update:
1. Saves result to frame_results table.
2. Updates waypoint in waypoints table.
3. All within a single transaction via F03.
"""
pass
@abstractmethod
def publish_waypoint_update(self, flight_id: str, frame_id: int) -> bool:
pass
@abstractmethod
def get_flight_results(self, flight_id: str) -> FlightResults:
pass
@abstractmethod
def mark_refined(self, flight_id: str, refined_results: List[RefinedFrameResult]) -> bool:
"""
Updates results for frames that have been retrospectively improved.
Args:
flight_id: Flight identifier
refined_results: List of RefinedFrameResult containing frame_id and GPS-converted coordinates
(caller F02.2 converts ENU to GPS before calling this method)
"""
pass
@abstractmethod
def get_changed_frames(self, flight_id: str, since: datetime) -> List[int]:
pass
@abstractmethod
def update_results_after_chunk_merge(self, flight_id: str, refined_results: List[RefinedFrameResult]) -> bool:
"""
Updates results for frames affected by chunk merge.
Args:
flight_id: Flight identifier
refined_results: List of RefinedFrameResult with GPS-converted coordinates
(caller F02.2 converts ENU to GPS before calling this method)
"""
pass
Component Description
Responsibilities
- Result consistency and publishing.
- Atomic Updates: Ensures consistency between normalized
waypointsand denormalizedframe_resultsvia transaction requests to F03.
Scope
- Result state management
- Flight Database integration (waypoint storage)
- SSE event triggering
- Incremental update detection
- Result persistence
API Methods
update_frame_result(flight_id: str, frame_id: int, result: FrameResult) -> bool
Description: Persists and publishes the result of a processed frame.
Called By:
- Main processing loop (after each frame)
- F10 Factor Graph (after refinement)
Input:
flight_id: str
frame_id: int
result: FrameResult:
gps_center: GPSPoint
altitude: float
heading: float
confidence: float
timestamp: datetime
refined: bool
objects: List[ObjectLocation] # From external detector
Output: bool - True if updated
Processing Flow:
- Construct DB transaction:
- Insert/Update
frame_results. - Update
waypoints(latest position).
- Insert/Update
- Call
F03.execute_transaction(). - If success: call
F15.send_frame_result(). - Update flight statistics.
Test Cases:
- New frame result → stored and published
- Refined result → updates existing, marks refined=True
publish_waypoint_update(flight_id: str, frame_id: int) -> bool
Description: Specifically triggers an update for the waypoint visualization.
Called By:
- Internal (after update_frame_result)
Input:
flight_id: str
frame_id: int
Output: bool - True if updated successfully
Processing Flow:
- Fetch latest data.
- Call
F15.send_frame_result()(or a specific lightweight event). - Handle errors (retry if transient)
Test Cases:
- Successful update → Waypoint stored in database
- Database unavailable → logs error, continues
get_flight_results(flight_id: str) -> FlightResults
Description: Retrieves all results for a flight.
Called By:
- F01 REST API (results endpoint)
- Testing/validation
Input: flight_id: str
Output:
FlightResults:
flight_id: str
frames: List[FrameResult]
statistics: FlightStatistics
Test Cases:
- Get all results → returns complete trajectory
mark_refined(flight_id: str, refined_results: List[RefinedFrameResult]) -> bool
Description: Updates results for frames that have been retrospectively improved (e.g., after loop closure or chunk merge).
Called By:
- F02.2 Flight Processing Engine (after factor graph optimization)
Input:
flight_id: str
refined_results: List[RefinedFrameResult] # GPS-converted results from F02.2
Output: bool
Important: F14 does NOT call F10 or F13. The caller (F02.2) performs the following steps before calling mark_refined():
- F02.2 gets refined poses from F10.get_trajectory(flight_id) (ENU coordinates)
- F02.2 converts ENU to GPS via F13.enu_to_gps(flight_id, enu_tuple)
- F02.2 constructs RefinedFrameResult objects with GPS coordinates
- F02.2 calls F14.mark_refined() with the GPS-converted results
Processing Flow:
- For each refined_result in refined_results:
- Extract frame_id, gps_center, confidence from RefinedFrameResult
- Update result with refined=True via F03 Flight Database (part of transaction)
- Update waypoint via F03 Flight Database (part of transaction)
- Call F15 SSE Event Streamer.send_refinement()
Test Cases:
- Batch refinement → all frames updated and published
- GPS coordinates match converted values
get_changed_frames(flight_id: str, since: datetime) -> List[int]
Description: Gets frames changed since timestamp (for incremental updates).
Called By:
- F15 SSE Event Streamer (for reconnection replay)
Input:
flight_id: str
since: datetime
Output: List[int] - Frame IDs changed since timestamp
Test Cases:
- Get changes → returns only modified frames
- No changes → returns empty list
update_results_after_chunk_merge(flight_id: str, refined_results: List[RefinedFrameResult]) -> bool
Description: Updates results for frames affected by chunk merging into main trajectory.
Called By:
- F02.2 Flight Processing Engine (after chunk merge completes)
Input:
flight_id: str
refined_results: List[RefinedFrameResult] # GPS-converted results for merged frames
Output: bool - True if updated successfully
Important: F14 does NOT call F10, F13, or F11. F02.2 coordinates the chunk merge workflow:
- F11 returns merge result to F02.2 (direct call return, not event)
- F02.2 gets updated poses from F10.get_trajectory(flight_id) (ENU coordinates)
- F02.2 converts ENU to GPS via F13.enu_to_gps(flight_id, enu_tuple)
- F02.2 constructs RefinedFrameResult objects
- F02.2 calls F14.update_results_after_chunk_merge() with GPS-converted results
Processing Flow:
- For each refined_result in refined_results:
- Extract frame_id, gps_center, confidence from RefinedFrameResult
- Update result with refined=True via F03 Flight Database (part of transaction)
- Update waypoint via F03 Flight Database (part of transaction)
- Call F15 SSE Event Streamer.send_refinement()
Test Cases:
- Chunk merge updates: All merged frames updated and published
- GPS accuracy: Updated GPS matches optimized poses
Integration Tests
Test 1: Per-Frame Processing
- Process frame 237
- update_frame_result() → stores result
- Verify publish_waypoint_update() called (F03.update_waypoint())
- Verify F15 SSE event sent
Test 2: Batch Refinement
- Process 100 frames
- Factor graph refines frames 10-50
- mark_refined([10-50]) → updates all
- Verify Flight Database updated (F03.batch_update_waypoints())
- Verify SSE refinement events sent
Test 3: Incremental Updates
- Process frames 1-100
- Client disconnects at frame 50
- Client reconnects
- get_changed_frames(since=frame_50_time)
- Client receives frames 51-100
Non-Functional Requirements
Performance
- update_frame_result: < 50ms
- publish_waypoint_update: < 100ms (non-blocking)
- get_flight_results: < 200ms for 2000 frames
Reliability
- Result persistence survives crashes
- Guaranteed at-least-once delivery to Flight Database
- Idempotent updates
Dependencies
Internal Components
- F03 Flight Database: Must support transactional updates.
- F15 SSE Event Streamer: For real-time result streaming.
Note: F14 does NOT depend on F10, F13, or F11. The caller (F02.2) coordinates with those components and provides GPS-converted results to F14. This ensures unidirectional data flow and eliminates circular dependencies.
External Dependencies
- None
Data Models
FrameResult
class ObjectLocation(BaseModel):
object_id: str
pixel: Tuple[float, float]
gps: GPSPoint
class_name: str
confidence: float
class FrameResult(BaseModel):
frame_id: int
gps_center: GPSPoint
altitude: float
heading: float
confidence: float
timestamp: datetime
refined: bool
objects: List[ObjectLocation]
updated_at: datetime
RefinedFrameResult
class RefinedFrameResult(BaseModel):
"""
GPS-converted frame result provided by F02.2 after coordinate transformation.
F02.2 converts ENU poses from F10 to GPS using F13 before passing to F14.
"""
frame_id: int
gps_center: GPSPoint # GPS coordinates (converted from ENU by F02.2)
confidence: float
heading: Optional[float] = None # Updated heading if available
FlightResults
class FlightStatistics(BaseModel):
total_frames: int
processed_frames: int
refined_frames: int
mean_confidence: float
processing_time: float
class FlightResults(BaseModel):
flight_id: str
frames: List[FrameResult]
statistics: FlightStatistics