Source code for veupath_chatbot.transport.http.routers.user_data

"""User data management — purge endpoints."""

import contextlib
from typing import cast

from fastapi import APIRouter, Query
from sqlalchemy import CursorResult, delete, select

from veupath_chatbot.persistence.models import (
    ControlSet,
    ExperimentRow,
    GeneSetRow,
    Stream,
)
from veupath_chatbot.platform.logging import get_logger
from veupath_chatbot.platform.redis import get_redis
from veupath_chatbot.platform.types import JSONObject
from veupath_chatbot.transport.http.deps import CurrentUser, StreamRepo

logger = get_logger(__name__)

router = APIRouter(prefix="/api/v1/user", tags=["user"])


[docs] @router.delete("/data") async def purge_user_data( user_id: CurrentUser, stream_repo: StreamRepo, site_id: str | None = Query(None, alias="siteId"), delete_wdk: bool = Query(False, alias="deleteWdk"), ) -> JSONObject: """Purge user data from all local stores. When ``deleteWdk=false`` (default): non-WDK streams are hard-deleted, WDK-linked projections are **dismissed** so WDK sync won't re-import them. The strategies remain on VEuPathDB but PathFinder ignores them. When ``deleteWdk=true``: everything is hard-deleted locally AND all WDK strategies are deleted from VEuPathDB. Always deletes: gene sets, experiments, control sets, Redis streams. Pass ``?siteId=X`` to limit to one site, or omit for everything. """ session = stream_repo.session redis = get_redis() uid_str = str(user_id) # ── 1. Find all streams (for Redis + WDK cleanup) ────────────── stream_query = select(Stream).where(Stream.user_id == user_id) if site_id: stream_query = stream_query.where(Stream.site_id == site_id) streams = list((await session.execute(stream_query)).scalars().all()) stream_ids = [str(s.id) for s in streams] # ── 2. Delete WDK strategies (only when explicitly requested) ── wdk_deleted = 0 if delete_wdk: from veupath_chatbot.integrations.veupathdb.factory import ( get_strategy_api, list_sites, ) sites_to_purge: set[str] = set() if site_id: sites_to_purge.add(site_id) else: sites_to_purge.update(s.id for s in list_sites()) for purge_site in sites_to_purge: try: api = get_strategy_api(purge_site) wdk_strategies = await api.list_strategies() for wdk_strat in wdk_strategies: if not isinstance(wdk_strat, dict): continue wdk_id = wdk_strat.get("strategyId") if wdk_id is not None: try: await api.delete_strategy(int(str(wdk_id))) wdk_deleted += 1 except Exception as exc: logger.warning( "Failed to delete WDK strategy during user data purge", wdk_strategy_id=wdk_id, site=purge_site, error=str(exc), ) except Exception as exc: logger.debug( "WDK purge skipped for site", site=purge_site, error=str(exc) ) # ── 4. Delete Redis streams ──────────────────────────────────── redis_deleted = 0 for sid in stream_ids: with contextlib.suppress(Exception): redis_deleted += int(await redis.delete(f"stream:{sid}")) # ── 5. Handle streams ────────────────────────────────────────── dismissed_count = 0 hard_deleted_count = 0 if delete_wdk: # Nuclear option: hard-delete ALL streams (cascade to projections). stream_del = delete(Stream).where(Stream.user_id == user_id) if site_id: stream_del = stream_del.where(Stream.site_id == site_id) sr = cast(CursorResult[object], await session.execute(stream_del)) hard_deleted_count = sr.rowcount or 0 else: from datetime import UTC, datetime from sqlalchemy import update from veupath_chatbot.persistence.models import StreamProjection # Dismiss ALL projections — prevents WDK sync from re-importing them. # We dismiss all (not just WDK-linked) to avoid a race condition where # auto-build assigns a wdk_strategy_id between classification and delete. if stream_ids: from uuid import UUID all_uuids = [UUID(sid) for sid in stream_ids] await session.execute( update(StreamProjection) .where(StreamProjection.stream_id.in_(all_uuids)) .values(dismissed_at=datetime.now(UTC)) ) dismissed_count = len(stream_ids) # ── 6. Delete gene sets, experiments, control sets ───────────── gs_del = delete(GeneSetRow).where(GeneSetRow.user_id == uid_str) if site_id: gs_del = gs_del.where(GeneSetRow.site_id == site_id) gr = cast(CursorResult[object], await session.execute(gs_del)) pg_gene_sets = gr.rowcount or 0 exp_del = delete(ExperimentRow).where(ExperimentRow.user_id == uid_str) if site_id: exp_del = exp_del.where(ExperimentRow.site_id == site_id) er = cast(CursorResult[object], await session.execute(exp_del)) pg_experiments = er.rowcount or 0 cs_del = delete(ControlSet).where(ControlSet.user_id == user_id) if site_id: cs_del = cs_del.where(ControlSet.site_id == site_id) cr = cast(CursorResult[object], await session.execute(cs_del)) pg_control_sets = cr.rowcount or 0 await session.commit() # ── 7. Clear in-memory caches so stale data doesn't reappear ── try: from veupath_chatbot.services.gene_sets.store import get_gene_set_store cache = get_gene_set_store() for gs in list(cache.list_for_user(user_id, site_id=site_id)): cache._cache.pop(gs.id, None) except Exception as exc: logger.warning( "Failed to clear gene set cache during user data purge", user_id=uid_str, error=str(exc), ) strategies_handled = hard_deleted_count + dismissed_count logger.info( "Purged user data", user_id=uid_str, site_id=site_id, delete_wdk=delete_wdk, strategies=strategies_handled, wdk_strategies=wdk_deleted, redis_streams=redis_deleted, gene_sets=pg_gene_sets, experiments=pg_experiments, control_sets=pg_control_sets, ) return { "ok": True, "deleted": { "strategies": strategies_handled, "wdkStrategies": wdk_deleted, "redisStreams": redis_deleted, "geneSets": pg_gene_sets, "experiments": pg_experiments, "controlSets": pg_control_sets, }, }