Source code for veupath_chatbot.jobs.rag_startup

import asyncio
from pathlib import Path

from veupath_chatbot.integrations.vectorstore.ingest.public_strategies import (
    ingest_public_strategies,
)
from veupath_chatbot.integrations.vectorstore.ingest.wdk_catalog import (
    ingest_wdk_catalog,
)
from veupath_chatbot.platform.config import get_settings
from veupath_chatbot.platform.logging import get_logger

logger = get_logger(__name__)

_startup_task: asyncio.Task[None] | None = None
_startup_lock = asyncio.Lock()


[docs] async def start_rag_startup_ingestion_background() -> None: """Fire-and-forget incremental ingestion at API startup. - Runs only when `rag_enabled=true` and `OPENAI_API_KEY` is set. - Never blocks API startup. - Only runs once per process. """ global _startup_task settings = get_settings() if not settings.rag_enabled: return if not settings.openai_api_key: logger.warning( "RAG enabled but OPENAI_API_KEY not set; skipping startup ingestion" ) return async with _startup_lock: if _startup_task is not None and not _startup_task.done(): return async def _run() -> None: max_per_site = settings.rag_startup_max_strategies_per_site conc = settings.rag_startup_public_strategies_concurrency llm_model = ( settings.rag_startup_public_strategies_llm_model or "gpt-4.1-nano" ).strip() report_path = Path( settings.rag_startup_public_strategies_report_path or "/tmp/ingest_public_strategies_report.jsonl" ) logger.info( "RAG startup ingestion begin", maxStrategiesPerSite=max_per_site, publicStrategiesConcurrency=conc, publicStrategiesLlmModel=llm_model, ) try: # 1) WDK catalog (record types + searches) await ingest_wdk_catalog(sites=None, reset=False, skip_existing=True) # 2) Example plans from public strategies await ingest_public_strategies( sites=None, reset=False, skip_existing=True, llm_model=llm_model, report_path=report_path, max_strategies_per_site=max_per_site, concurrency=conc, ) except Exception as exc: logger.warning( "RAG startup ingestion incomplete", error=str(exc), errorType=type(exc).__name__, ) return logger.info("RAG startup ingestion complete") _startup_task = asyncio.create_task(_run())