Source code for veupath_chatbot.persistence.session

"""SQLAlchemy async engine and session management."""

from collections.abc import AsyncGenerator

from sqlalchemy.engine import make_url
from sqlalchemy.ext.asyncio import (
    AsyncEngine,
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)

from veupath_chatbot.platform.config import get_settings
from veupath_chatbot.platform.errors import InternalError

_engine: AsyncEngine | None = None
_session_factory: async_sessionmaker[AsyncSession] | None = None


def _get_engine() -> AsyncEngine:
    """Lazily create the async engine on first access."""
    global _engine
    if _engine is not None:
        return _engine

    settings = get_settings()
    db_url = make_url(settings.database_url)

    if not db_url.drivername.startswith("postgresql"):
        raise InternalError(
            title="Unsupported database configuration",
            detail=(
                "SQLite is no longer supported. Set DATABASE_URL to a PostgreSQL URL, e.g. "
                "'postgresql+asyncpg://postgres:postgres@localhost:5432/pathfinder'."
            ),
        )

    _engine = create_async_engine(
        settings.database_url,
        echo=settings.is_development,
        pool_pre_ping=True,
        pool_size=5,
        max_overflow=10,
    )
    return _engine


def _get_session_factory() -> async_sessionmaker[AsyncSession]:
    """Lazily create the session factory on first access."""
    global _session_factory
    if _session_factory is not None:
        return _session_factory

    _session_factory = async_sessionmaker(
        _get_engine(),
        class_=AsyncSession,
        expire_on_commit=False,
        autoflush=False,
    )
    return _session_factory


[docs] def get_engine() -> AsyncEngine: """Get the lazily-initialized async engine.""" return _get_engine()
[docs] def async_session_factory() -> AsyncSession: """Create a new async session from the lazily-initialized factory.""" return _get_session_factory()()
[docs] async def get_db_session() -> AsyncGenerator[AsyncSession]: """Dependency to get database session.""" async with _get_session_factory()() as session: try: yield session await session.commit() except Exception: await session.rollback() raise
[docs] async def init_db() -> None: """Initialize database — creates all tables from ORM models.""" from veupath_chatbot.persistence.models import Base real_engine = _get_engine() async with real_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all)
[docs] async def close_db() -> None: """Close database connections.""" global _engine, _session_factory if _engine is not None: await _engine.dispose() _engine = None _session_factory = None