Source code for veupath_chatbot.transport.http.routers.strategies.crud

"""Strategy CRUD endpoints — CQRS only (streams + stream_projections)."""

from typing import Annotated
from uuid import UUID

from fastapi import APIRouter, Query, Response

from veupath_chatbot.platform.errors import ErrorCode, NotFoundError, ValidationError
from veupath_chatbot.platform.events import read_stream_messages, read_stream_thinking
from veupath_chatbot.platform.logging import get_logger
from veupath_chatbot.platform.redis import get_redis
from veupath_chatbot.platform.types import JSONObject
from veupath_chatbot.services.strategies.plan_validation import validate_plan_or_raise
from veupath_chatbot.services.strategies.wdk_sync import (
    lazy_fetch_wdk_detail,
    sync_is_saved_to_wdk,
)
from veupath_chatbot.services.wdk import get_strategy_api
from veupath_chatbot.transport.http.deps import CurrentUser, StreamRepo
from veupath_chatbot.transport.http.schemas import (
    CreateStrategyRequest,
    StrategyResponse,
    UpdateStrategyRequest,
)

from .._authz import get_owned_projection_or_404
from ._shared import (
    build_projection_response,
    build_projection_summary,
)

router = APIRouter(prefix="/api/v1/strategies", tags=["strategies"])
logger = get_logger(__name__)


[docs] @router.get("", response_model=list[StrategyResponse]) async def list_strategies( stream_repo: StreamRepo, user_id: CurrentUser, site_id: Annotated[str | None, Query(alias="siteId")] = None, ) -> list[StrategyResponse]: """List user's conversation streams (projections).""" projections = await stream_repo.list_projections(user_id, site_id) return [build_projection_summary(p, site_id=site_id or "") for p in projections]
[docs] @router.get("/dismissed", response_model=list[StrategyResponse]) async def list_dismissed_strategies( stream_repo: StreamRepo, user_id: CurrentUser, site_id: Annotated[str | None, Query(alias="siteId")] = None, ) -> list[StrategyResponse]: """List user's dismissed (soft-deleted) strategies.""" projections = await stream_repo.list_dismissed_projections(user_id, site_id) return [build_projection_summary(p, site_id=site_id or "") for p in projections]
[docs] @router.post("", response_model=StrategyResponse, status_code=201) async def create_strategy( request: CreateStrategyRequest, stream_repo: StreamRepo, user_id: CurrentUser, ) -> StrategyResponse: """Create a new strategy (CQRS only).""" plan_in = request.plan.model_dump(exclude_none=True) strategy_ast = validate_plan_or_raise(plan_in) plan = strategy_ast.to_dict() stream = await stream_repo.create( user_id=user_id, site_id=request.site_id, name=request.name, ) await stream_repo.update_projection( stream.id, plan=plan, record_type=strategy_ast.record_type, step_count=len(strategy_ast.get_all_steps()), ) projection = await stream_repo.get_projection(stream.id) if not projection: raise NotFoundError( code=ErrorCode.STRATEGY_NOT_FOUND, title="Strategy not found" ) return build_projection_response(projection)
[docs] @router.get("/{strategyId:uuid}", response_model=StrategyResponse) async def get_strategy( strategyId: UUID, stream_repo: StreamRepo, user_id: CurrentUser, ) -> StrategyResponse: """Get a strategy/stream by ID from the CQRS projection + Redis.""" projection = await get_owned_projection_or_404(stream_repo, strategyId, user_id) # Lazy detail fetch: if this is a WDK-linked strategy with no plan data, # fetch the full detail from WDK now (summary-only projections are created # during sync-wdk to avoid the N+1 problem). projection = await lazy_fetch_wdk_detail( projection=projection, stream_repo=stream_repo, ) redis = get_redis() messages = await read_stream_messages(redis, str(strategyId)) thinking = await read_stream_thinking(redis, str(strategyId)) return build_projection_response(projection, messages=messages, thinking=thinking)
[docs] @router.patch("/{strategyId:uuid}", response_model=StrategyResponse) async def update_strategy( strategyId: UUID, request: UpdateStrategyRequest, stream_repo: StreamRepo, user_id: CurrentUser, ) -> StrategyResponse: """Update a strategy (CQRS only).""" await get_owned_projection_or_404(stream_repo, strategyId, user_id) strategy_ast = None record_type = None plan: JSONObject | None = None if request.plan: plan_in = request.plan.model_dump(exclude_none=True) strategy_ast = validate_plan_or_raise(plan_in) plan = strategy_ast.to_dict() record_type = strategy_ast.record_type fields_set: set[str] = getattr(request, "model_fields_set", set()) wdk_strategy_id_set = "wdk_strategy_id" in fields_set is_saved_set = "is_saved" in fields_set await stream_repo.update_projection( strategyId, name=request.name, plan=plan, record_type=record_type, wdk_strategy_id=request.wdk_strategy_id, wdk_strategy_id_set=wdk_strategy_id_set, is_saved=request.is_saved, is_saved_set=is_saved_set, step_count=len(strategy_ast.get_all_steps()) if strategy_ast else None, ) # Re-fetch updated projection. updated = await stream_repo.get_projection(strategyId) if not updated: raise NotFoundError( code=ErrorCode.STRATEGY_NOT_FOUND, title="Strategy not found" ) # If isSaved was toggled and WDK strategy exists, sync the flag to WDK. if is_saved_set and updated.wdk_strategy_id: await sync_is_saved_to_wdk(projection=updated) return build_projection_response(updated)
[docs] @router.delete("/{strategyId:uuid}", status_code=204, response_class=Response) async def delete_strategy( strategyId: UUID, stream_repo: StreamRepo, user_id: CurrentUser, delete_from_wdk: Annotated[bool, Query(alias="deleteFromWdk")] = False, ) -> Response: """Delete a strategy: cancel ops, clean Redis stream, delete CQRS records. For WDK-linked strategies with ``deleteFromWdk=false`` (default), the strategy is soft-deleted (dismissed) instead of hard-deleted. This prevents WDK sync from re-importing it. Use the restore endpoint to un-dismiss. Pass ``deleteFromWdk=true`` to hard-delete from both PathFinder and WDK. Non-WDK strategies are always hard-deleted. """ projection = await get_owned_projection_or_404(stream_repo, strategyId, user_id) # Cancel any active operations for this stream before deleting. from veupath_chatbot.services.chat.orchestrator import cancel_chat_operation active_ops = await stream_repo.get_active_operations(strategyId) for op in active_ops: await cancel_chat_operation(op.operation_id) # Clean up Redis stream. redis = get_redis() await redis.delete(f"stream:{strategyId}") is_wdk_linked = projection.wdk_strategy_id is not None if is_wdk_linked and not delete_from_wdk: # Soft-delete: dismiss the projection (hidden from list, skipped by sync). await stream_repo.dismiss(strategyId) else: # Hard-delete: remove from WDK if requested, then delete CQRS records. if delete_from_wdk and projection.wdk_strategy_id and projection.stream: try: api = get_strategy_api(projection.stream.site_id) await api.delete_strategy(projection.wdk_strategy_id) except Exception as e: logger.warning( "WDK strategy delete failed", wdk_strategy_id=projection.wdk_strategy_id, error=str(e), ) await stream_repo.delete(strategyId) return Response(status_code=204)
[docs] @router.get("/{strategyId:uuid}/ast") async def get_strategy_ast( strategyId: UUID, stream_repo: StreamRepo, user_id: CurrentUser, ) -> JSONObject: """Return the raw plan AST from a strategy's projection.""" projection = await get_owned_projection_or_404(stream_repo, strategyId, user_id) plan = projection.plan if not plan or not isinstance(plan, dict): raise NotFoundError( code=ErrorCode.STRATEGY_NOT_FOUND, title="Strategy has no plan AST", ) return plan
[docs] @router.post("/{strategyId:uuid}/restore", response_model=StrategyResponse) async def restore_strategy( strategyId: UUID, stream_repo: StreamRepo, user_id: CurrentUser, ) -> StrategyResponse: """Restore a dismissed (soft-deleted) strategy. Clears dismissed_at, resets plan to empty (triggers lazy WDK re-fetch), and wipes message history. The strategy reappears as if freshly imported. """ projection = await get_owned_projection_or_404(stream_repo, strategyId, user_id) if projection.dismissed_at is None: raise ValidationError( detail="Strategy is not dismissed", errors=[ { "path": "strategyId", "message": "Not dismissed", "code": "INVALID_STATE", } ], ) await stream_repo.restore(strategyId) # Wipe Redis messages (clean slate). redis = get_redis() await redis.delete(f"stream:{strategyId}") updated = await stream_repo.get_projection(strategyId) if not updated: raise NotFoundError( code=ErrorCode.STRATEGY_NOT_FOUND, title="Strategy not found" ) return build_projection_summary(updated)