"""WDK-backed strategy endpoints (open/import/sync/list)."""
from typing import Annotated
from fastapi import APIRouter, BackgroundTasks, Query
from shared_py.defaults import DEFAULT_STREAM_NAME
from veupath_chatbot.platform.errors import (
ErrorCode,
NotFoundError,
ValidationError,
WDKError,
)
from veupath_chatbot.platform.logging import get_logger
from veupath_chatbot.services.control_helpers import (
cleanup_internal_control_test_strategies,
)
from veupath_chatbot.services.strategies.auto_import import (
background_auto_import_gene_sets,
)
from veupath_chatbot.services.strategies.wdk_conversion import parse_wdk_strategy_id
from veupath_chatbot.services.strategies.wdk_sync import (
sync_to_projection,
upsert_summary_projection,
)
from veupath_chatbot.services.wdk import (
get_site,
get_strategy_api,
is_internal_wdk_strategy_name,
)
from veupath_chatbot.transport.http.deps import (
CurrentUser,
StreamRepo,
)
from veupath_chatbot.transport.http.schemas import (
OpenStrategyRequest,
OpenStrategyResponse,
StrategyResponse,
)
from ._shared import build_projection_summary
router = APIRouter(prefix="/api/v1/strategies", tags=["strategies"])
logger = get_logger(__name__)
[docs]
@router.post("/open", response_model=OpenStrategyResponse)
async def open_strategy(
request: OpenStrategyRequest,
stream_repo: StreamRepo,
user_id: CurrentUser,
) -> OpenStrategyResponse:
"""Open a strategy by local or WDK strategy."""
if not request.strategy_id and not request.wdk_strategy_id:
if not request.site_id:
raise ValidationError(
detail="siteId is required",
errors=[
{
"path": "siteId",
"message": "Required",
"code": "INVALID_PARAMETERS",
}
],
)
# New conversation: create Stream + StreamProjection.
stream = await stream_repo.create(
user_id=user_id,
site_id=request.site_id,
name=DEFAULT_STREAM_NAME,
)
return OpenStrategyResponse(strategyId=stream.id)
elif request.strategy_id:
# Verify the stream exists and belongs to user.
projection = await stream_repo.get_projection(request.strategy_id)
if not projection or not projection.stream:
raise NotFoundError(
code=ErrorCode.STRATEGY_NOT_FOUND, title="Strategy not found"
)
if projection.stream.user_id != user_id:
raise NotFoundError(
code=ErrorCode.STRATEGY_NOT_FOUND, title="Strategy not found"
)
return OpenStrategyResponse(strategyId=projection.stream_id)
else:
if not request.site_id:
raise ValidationError(
detail="siteId is required",
errors=[
{
"path": "siteId",
"message": "Required",
"code": "INVALID_PARAMETERS",
}
],
)
if request.wdk_strategy_id is None:
raise ValidationError(
detail="wdk_strategy_id is required",
errors=[
{
"path": "wdk_strategy_id",
"message": "Required",
"code": "INVALID_PARAMETERS",
}
],
)
try:
api = get_strategy_api(request.site_id)
projection = await sync_to_projection(
wdk_id=request.wdk_strategy_id,
site_id=request.site_id,
api=api,
stream_repo=stream_repo,
user_id=user_id,
)
except WDKError as e:
logger.error("WDK fetch failed", error=str(e))
raise
except Exception as e:
logger.error("WDK fetch failed", error=str(e))
raise WDKError(f"Failed to load WDK strategy: {e}") from e
return OpenStrategyResponse(strategyId=projection.stream_id)
[docs]
@router.post("/sync-wdk", response_model=list[StrategyResponse])
async def sync_all_wdk_strategies(
site_id: Annotated[str, Query(alias="siteId")],
stream_repo: StreamRepo,
user_id: CurrentUser,
background_tasks: BackgroundTasks,
) -> list[StrategyResponse]:
"""Batch-sync all WDK strategies into the CQRS layer and return the full list."""
site = get_site(site_id)
try:
api = get_strategy_api(site.id)
wdk_items = await api.list_strategies()
await cleanup_internal_control_test_strategies(api, wdk_items, site_id=site.id)
except Exception as e:
logger.warning("WDK list failed during sync", site_id=site.id, error=str(e))
wdk_items = []
synced_wdk_ids: set[int] = set()
for item in wdk_items:
if not isinstance(item, dict):
continue
wdk_id = parse_wdk_strategy_id(item)
if wdk_id is None:
continue
name_raw = item.get("name")
name = name_raw if isinstance(name_raw, str) else ""
if is_internal_wdk_strategy_name(name):
continue
synced_wdk_ids.add(wdk_id)
try:
async with stream_repo.session.begin_nested():
await upsert_summary_projection(
item,
stream_repo=stream_repo,
user_id=user_id,
site_id=site.id,
)
except Exception as e:
logger.warning(
"Failed to sync WDK strategy",
wdk_id=wdk_id,
site_id=site.id,
error=str(e),
)
# Prune orphaned streams whose WDK counterparts no longer exist.
if wdk_items:
try:
async with stream_repo.session.begin_nested():
pruned = await stream_repo.prune_wdk_orphans(
user_id, site.id, synced_wdk_ids
)
if pruned:
logger.info(
"Pruned orphaned streams",
site_id=site.id,
pruned_count=pruned,
)
except Exception as e:
logger.warning(
"Failed to prune orphaned streams",
site_id=site.id,
error=str(e),
)
projections = await stream_repo.list_projections(user_id, site_id)
# Commit the session so all locks are released before the background task
# opens its own session — prevents deadlock between the prune DELETE and
# the auto-import's concurrent SELECT/UPDATE on the same tables.
await stream_repo.session.commit()
background_tasks.add_task(
background_auto_import_gene_sets, site_id=site.id, user_id=user_id
)
return [build_projection_summary(p, site_id=site_id) for p in projections]