Source code for veupath_chatbot.persistence.repositories.user
"""User repository."""
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from veupath_chatbot.persistence.models import User
[docs]
class UserRepository:
"""User CRUD operations."""
[docs]
def __init__(self, session: AsyncSession) -> None:
self.session = session
[docs]
async def get_by_id(self, user_id: UUID) -> User | None:
"""Get user by ID."""
return await self.session.get(User, user_id)
[docs]
async def get_or_create(self, user_id: UUID) -> User:
"""Get existing user or create new one."""
# Avoid race conditions when multiple concurrent requests attempt to create
# the same user (e.g. initial page load triggering parallel API calls).
stmt = (
pg_insert(User)
.values(id=user_id)
.on_conflict_do_nothing(index_elements=[User.id])
)
await self.session.execute(stmt)
user = await self.get_by_id(user_id)
if user is None:
# Should be impossible, but keep behavior explicit.
user = User(id=user_id)
self.session.add(user)
await self.session.flush()
return user
[docs]
async def get_or_create_by_external_id(self, external_id: str) -> User:
"""Lookup a user by external identity (e.g. VEuPathDB email).
Creates a new row if none exists yet, avoiding race conditions with
``INSERT ... ON CONFLICT``.
"""
stmt = (
pg_insert(User)
.values(external_id=external_id)
.on_conflict_do_nothing(index_elements=[User.external_id])
)
await self.session.execute(stmt)
result = await self.session.execute(
select(User).where(User.external_id == external_id)
)
user = result.scalar_one_or_none()
if user is None:
user = User(external_id=external_id)
self.session.add(user)
await self.session.flush()
return user
[docs]
async def create(self, external_id: str | None = None) -> User:
"""Create a new user."""
user = User(external_id=external_id)
self.session.add(user)
await self.session.flush()
return user