from typing import AsyncGenerator from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy.orm import DeclarativeBase from models.config import DatabaseConfig class Base(DeclarativeBase): pass _engine = None _session_factory = None def get_engine(config: DatabaseConfig): global _engine if _engine is None: url = f"postgresql+asyncpg://{config.username}:{config.password}@{config.host}:{config.port}/{config.database}" _engine = create_async_engine( url, pool_size=config.pool_size, max_overflow=config.max_overflow, pool_timeout=config.pool_timeout, pool_recycle=config.pool_recycle, ) return _engine def get_session_factory(config: DatabaseConfig): global _session_factory if _session_factory is None: engine = get_engine(config) _session_factory = async_sessionmaker(engine, expire_on_commit=False) return _session_factory async def get_session(config: DatabaseConfig) -> AsyncGenerator[AsyncSession, None]: factory = get_session_factory(config) async with factory() as session: yield session async def init_db(config: DatabaseConfig) -> None: engine = get_engine(config) async with engine.begin() as conn: from .models import Base await conn.run_sync(Base.metadata.create_all)