"""FastAPI application entrypoint."""
from collections.abc import AsyncGenerator, Awaitable, Callable
from contextlib import asynccontextmanager
from typing import cast
from uuid import uuid4
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from slowapi.errors import RateLimitExceeded
from starlette.requests import Request as StarletteRequest
from starlette.responses import Response
from veupath_chatbot import __version__
from veupath_chatbot.integrations.vectorstore.bootstrap import ensure_rag_collections
from veupath_chatbot.integrations.vectorstore.qdrant_store import (
close_all_qdrant_stores,
)
from veupath_chatbot.integrations.veupathdb.factory import close_all_clients
from veupath_chatbot.integrations.veupathdb.site_search import close_site_search_client
from veupath_chatbot.jobs.rag_startup import start_rag_startup_ingestion_background
from veupath_chatbot.persistence.session import close_db, init_db
from veupath_chatbot.platform.config import get_settings
from veupath_chatbot.platform.context import (
request_base_url_ctx,
request_id_ctx,
veupathdb_auth_token_ctx,
)
from veupath_chatbot.platform.errors import (
AppError,
app_error_handler,
http_exception_handler,
)
from veupath_chatbot.platform.logging import get_logger, setup_logging
from veupath_chatbot.platform.redis import close_redis, init_redis
from veupath_chatbot.platform.security import limiter
from veupath_chatbot.transport.http.routers import (
chat,
control_sets,
experiments,
health,
internal,
models,
operations,
sites,
strategies,
tools,
veupathdb_auth,
)
from veupath_chatbot.transport.http.routers.exports import router as exports_router
from veupath_chatbot.transport.http.routers.gene_sets import router as gene_sets_router
logger = get_logger(__name__)
[docs]
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
"""Application lifespan handler."""
settings = get_settings()
# Startup
setup_logging()
logger.info(
"Starting Pathfinder API",
version=__version__,
env=settings.api_env,
)
# Initialize database
# For local Docker and first-run developer setups, we create tables automatically.
# (Alembic migrations are not supported.)
await init_db()
await init_redis()
# Mark any operations left "active" from a previous process as failed.
# This handles the Docker-rebuild / crash case where the producer task
# died without marking the operation complete.
from veupath_chatbot.persistence.repositories.stream import StreamRepository
from veupath_chatbot.persistence.session import async_session_factory
async with async_session_factory() as session:
repo = StreamRepository(session)
orphaned = await repo.list_active_operations()
for op in orphaned:
await repo.fail_operation(op.operation_id)
logger.info(
"Marked orphaned operation as failed", operation_id=op.operation_id
)
if orphaned:
await session.commit()
try:
await ensure_rag_collections()
except Exception as exc: # pragma: no cover
# Do not fail API startup if Qdrant is unavailable or misconfigured.
logger.warning("Failed to ensure RAG collections", error=str(exc))
try:
await start_rag_startup_ingestion_background()
except Exception as exc: # pragma: no cover
logger.warning("Failed to start RAG startup ingestion", error=str(exc))
yield
# Shutdown
logger.info("Shutting down Pathfinder API")
await close_all_qdrant_stores()
await close_all_clients()
await close_site_search_client()
await close_redis()
await close_db()
def _wire_ai_dependencies() -> None:
"""Wire AI-layer implementations into service-layer modules.
This is the composition root: the only place that links the AI
layer's concrete implementations to the service layer's injected
slots. Keeps services free of direct ``veupath_chatbot.ai`` imports.
"""
from veupath_chatbot.ai.agents.experiment import ExperimentAssistantAgent
from veupath_chatbot.ai.agents.factory import (
create_agent,
create_engine,
resolve_effective_model_id,
)
from veupath_chatbot.services.chat import orchestrator as chat_orchestrator
from veupath_chatbot.services.experiment import (
ai_analysis_tools as analysis_tools,
)
from veupath_chatbot.services.experiment import assistant as exp_assistant
# When chat_provider is "mock", override the default model to use MockEngine.
# This makes the REAL agent use a deterministic engine — all downstream
# systems (WDK, DB, Redis, gene sets, auto-build) still run real.
settings = get_settings()
if settings.chat_provider.strip().lower() == "mock":
settings.default_model_id = "mock/deterministic"
chat_orchestrator.configure(
create_agent_fn=create_agent,
resolve_model_id_fn=resolve_effective_model_id,
)
exp_assistant.configure(
create_engine_fn=create_engine,
experiment_agent_cls=ExperimentAssistantAgent,
)
analysis_tools.configure(
experiment_agent_cls=ExperimentAssistantAgent,
)
# Workbench chat orchestrator
from uuid import UUID
from kani import ChatMessage
from veupath_chatbot.ai.agents.workbench import WorkbenchAgent
from veupath_chatbot.platform.types import ModelProvider, ReasoningEffort
from veupath_chatbot.services.workbench_chat import (
orchestrator as wb_orchestrator,
)
def _create_workbench_agent(
site_id: str,
experiment_id: str,
user_id: UUID | None = None,
system_prompt: str = "",
chat_history: list[ChatMessage] | None = None,
provider_override: ModelProvider | None = None,
model_override: str | None = None,
reasoning_effort: ReasoningEffort | None = None,
) -> WorkbenchAgent:
engine = create_engine(
provider_override=provider_override,
model_override=model_override,
reasoning_effort=reasoning_effort,
)
return WorkbenchAgent(
engine=engine,
site_id=site_id,
experiment_id=experiment_id,
user_id=user_id,
system_prompt=system_prompt,
chat_history=chat_history,
)
wb_orchestrator.configure(
create_workbench_agent_fn=_create_workbench_agent,
resolve_model_id_fn=resolve_effective_model_id,
)
[docs]
def create_app() -> FastAPI:
"""Create and configure FastAPI application."""
_wire_ai_dependencies()
settings = get_settings()
app = FastAPI(
title="Pathfinder API",
description="VEuPathDB Strategy Builder Chatbot API",
version=__version__,
lifespan=lifespan,
docs_url="/docs" if settings.api_docs_enabled else None,
redoc_url="/redoc" if settings.api_docs_enabled else None,
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_origin_regex=settings.cors_origin_regex,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
allow_headers=[
"Authorization",
"Content-Type",
"X-Request-ID",
"X-VEUPATHDB-AUTH",
"X-VEUPATHDB-AUTHORIZATION",
],
)
# Rate limiter (slowapi)
app.state.limiter = limiter
# Request ID middleware
@app.middleware("http")
async def add_request_id(
request: StarletteRequest,
call_next: Callable[[StarletteRequest], Awaitable[Response]],
) -> Response:
request_id = request.headers.get("X-Request-ID", str(uuid4()))
request_id_ctx.set(request_id)
veupathdb_auth_token_ctx.set(
request.headers.get("X-VEUPATHDB-AUTH")
or request.headers.get("X-VEUPATHDB-AUTHORIZATION")
or request.cookies.get("Authorization")
)
# Capture the frontend origin for constructing full download URLs.
origin = (
request.headers.get("Origin")
or request.headers.get("Referer", "").rstrip("/").rsplit("/api/", 1)[0]
or (settings.cors_origins[0] if settings.cors_origins else None)
)
request_base_url_ctx.set(origin)
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
# Exception handlers
app.add_exception_handler(
AppError,
cast(
Callable[[StarletteRequest, Exception], Awaitable[Response]],
app_error_handler,
),
)
app.add_exception_handler(
HTTPException,
cast(
Callable[[StarletteRequest, Exception], Awaitable[Response]],
http_exception_handler,
),
)
app.add_exception_handler(
RateLimitExceeded,
cast(
Callable[[StarletteRequest, Exception], Awaitable[Response]],
lambda request, exc: Response(
content=str(exc.detail),
status_code=429,
headers={"Retry-After": "60"},
),
),
)
# Routers
app.include_router(health.router)
app.include_router(sites.router)
app.include_router(models.router)
app.include_router(tools.router)
app.include_router(chat.router)
app.include_router(strategies.router)
app.include_router(experiments.router)
app.include_router(control_sets.router)
app.include_router(veupathdb_auth.router)
app.include_router(operations.router)
app.include_router(gene_sets_router)
app.include_router(exports_router)
app.include_router(internal.router)
from veupath_chatbot.transport.http.routers import user_data
app.include_router(user_data.router)
# Dev-only routes (e2e / local dev with mock chat provider).
if settings.chat_provider.strip().lower() == "mock":
from veupath_chatbot.transport.http.routers import dev
app.include_router(dev.router)
return app
app = create_app()
if __name__ == "__main__":
import uvicorn
settings = get_settings()
uvicorn.run(
"veupath_chatbot.main:app",
host=settings.api_host,
port=settings.api_port,
reload=bool(settings.is_development),
)