Files
gps-denied-onboard/h05_performance_monitor.py
Denys Zaitsev d7e1066c60 Initial commit
2026-04-03 23:25:54 +03:00

66 lines
2.3 KiB
Python

import time
import statistics
import uuid
import logging
from typing import Dict, List, Tuple
from pydantic import BaseModel
from abc import ABC, abstractmethod
from contextlib import contextmanager
logger = logging.getLogger(__name__)
class PerformanceStats(BaseModel):
operation: str
count: int
mean: float
p50: float
p95: float
p99: float
max: float
class IPerformanceMonitor(ABC):
@abstractmethod
def start_timer(self, operation: str) -> str: pass
@abstractmethod
def end_timer(self, timer_id: str) -> float: pass
@abstractmethod
def get_statistics(self, operation: str) -> PerformanceStats: pass
@abstractmethod
def check_sla(self, operation: str, threshold: float) -> bool: pass
@abstractmethod
def get_bottlenecks(self) -> List[Tuple[str, float]]: pass
class PerformanceMonitor(IPerformanceMonitor):
"""H05: Tracks processing times, ensures <5s constraint per frame."""
def __init__(self, ac7_limit_s: float = 5.0):
self.ac7_limit_s = ac7_limit_s
self._timers: Dict[str, Tuple[str, float]] = {}
self._history: Dict[str, List[float]] = {}
def start_timer(self, operation: str) -> str:
timer_id = str(uuid.uuid4())
self._timers[timer_id] = (operation, time.time())
return timer_id
def end_timer(self, timer_id: str) -> float:
if timer_id not in self._timers: return 0.0
operation, start_time = self._timers.pop(timer_id)
duration = time.time() - start_time
self._history.setdefault(operation, []).append(duration)
return duration
@contextmanager
def measure(self, operation: str, limit_ms: float = 0.0):
timer_id = self.start_timer(operation)
try:
yield
finally:
duration = self.end_timer(timer_id)
threshold = limit_ms / 1000.0 if limit_ms > 0 else self.ac7_limit_s
if duration > threshold:
logger.warning(f"SLA Violation: {operation} took {duration:.3f}s (Threshold: {threshold:.3f}s)")
def get_statistics(self, operation: str) -> PerformanceStats:
return PerformanceStats(operation=operation, count=0, mean=0.0, p50=0.0, p95=0.0, p99=0.0, max=0.0)
def check_sla(self, operation: str, threshold: float) -> bool: return True
def get_bottlenecks(self) -> List[Tuple[str, float]]: return []