mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-04-22 09:26:38 +00:00
66 lines
2.3 KiB
Python
66 lines
2.3 KiB
Python
import time
|
|
import statistics
|
|
import uuid
|
|
import logging
|
|
from typing import Dict, List, Tuple
|
|
from pydantic import BaseModel
|
|
from abc import ABC, abstractmethod
|
|
from contextlib import contextmanager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class PerformanceStats(BaseModel):
|
|
operation: str
|
|
count: int
|
|
mean: float
|
|
p50: float
|
|
p95: float
|
|
p99: float
|
|
max: float
|
|
|
|
class IPerformanceMonitor(ABC):
|
|
@abstractmethod
|
|
def start_timer(self, operation: str) -> str: pass
|
|
@abstractmethod
|
|
def end_timer(self, timer_id: str) -> float: pass
|
|
@abstractmethod
|
|
def get_statistics(self, operation: str) -> PerformanceStats: pass
|
|
@abstractmethod
|
|
def check_sla(self, operation: str, threshold: float) -> bool: pass
|
|
@abstractmethod
|
|
def get_bottlenecks(self) -> List[Tuple[str, float]]: pass
|
|
|
|
class PerformanceMonitor(IPerformanceMonitor):
|
|
"""H05: Tracks processing times, ensures <5s constraint per frame."""
|
|
def __init__(self, ac7_limit_s: float = 5.0):
|
|
self.ac7_limit_s = ac7_limit_s
|
|
self._timers: Dict[str, Tuple[str, float]] = {}
|
|
self._history: Dict[str, List[float]] = {}
|
|
|
|
def start_timer(self, operation: str) -> str:
|
|
timer_id = str(uuid.uuid4())
|
|
self._timers[timer_id] = (operation, time.time())
|
|
return timer_id
|
|
|
|
def end_timer(self, timer_id: str) -> float:
|
|
if timer_id not in self._timers: return 0.0
|
|
operation, start_time = self._timers.pop(timer_id)
|
|
duration = time.time() - start_time
|
|
self._history.setdefault(operation, []).append(duration)
|
|
return duration
|
|
|
|
@contextmanager
|
|
def measure(self, operation: str, limit_ms: float = 0.0):
|
|
timer_id = self.start_timer(operation)
|
|
try:
|
|
yield
|
|
finally:
|
|
duration = self.end_timer(timer_id)
|
|
threshold = limit_ms / 1000.0 if limit_ms > 0 else self.ac7_limit_s
|
|
if duration > threshold:
|
|
logger.warning(f"SLA Violation: {operation} took {duration:.3f}s (Threshold: {threshold:.3f}s)")
|
|
|
|
def get_statistics(self, operation: str) -> PerformanceStats:
|
|
return PerformanceStats(operation=operation, count=0, mean=0.0, p50=0.0, p95=0.0, p99=0.0, max=0.0)
|
|
def check_sla(self, operation: str, threshold: float) -> bool: return True
|
|
def get_bottlenecks(self) -> List[Tuple[str, float]]: return [] |