Source code for veupath_chatbot.services.control_helpers
"""Formatting and parsing utilities for control-test evaluation."""
from veupath_chatbot.integrations.veupathdb.strategy_api import (
StrategyAPI,
is_internal_wdk_strategy_name,
strip_internal_wdk_strategy_name,
)
from veupath_chatbot.platform.logging import get_logger
from veupath_chatbot.services.experiment.types import ControlValueFormat
logger = get_logger(__name__)
def _encode_id_list(ids: list[str], fmt: ControlValueFormat) -> str:
cleaned = [str(x).strip() for x in ids if str(x).strip()]
if fmt == "newline":
return "\n".join(cleaned)
if fmt == "comma":
return ",".join(cleaned)
# json_list
import json
return json.dumps(cleaned)
[docs]
async def delete_temp_strategy(api: StrategyAPI, strategy_id: int | None) -> None:
"""Best-effort deletion of a temporary WDK strategy.
Silently logs and swallows errors — callers should use this in
``finally`` blocks to avoid masking the original exception.
"""
if strategy_id is None:
return
try:
await api.delete_strategy(strategy_id)
except Exception as exc:
logger.warning(
"Failed to delete temp WDK strategy during cleanup",
temp_strategy_id=strategy_id,
error=str(exc),
)
async def _get_total_count_for_step(api: StrategyAPI, step_id: int) -> int | None:
"""Get totalCount for a step. Returns None on failure."""
try:
return await api.get_step_count(step_id)
except Exception as exc:
logger.debug(
"Failed to get total count for step", step_id=step_id, error=str(exc)
)
return None
[docs]
async def cleanup_internal_control_test_strategies(
api: StrategyAPI,
wdk_items: object,
*,
site_id: str = "",
) -> None:
"""Delete leaked internal control-test strategies from a WDK item list.
Callers fetch the item list themselves (via ``api.list_strategies()``),
then pass it here for cleanup.
"""
if not isinstance(wdk_items, list):
return
for item in wdk_items:
if not isinstance(item, dict):
continue
name_raw = item.get("name")
name = name_raw if isinstance(name_raw, str) else None
if not isinstance(name, str) or not is_internal_wdk_strategy_name(name):
continue
display_name = strip_internal_wdk_strategy_name(name)
if not display_name.startswith("Pathfinder control test"):
continue
wdk_id = item.get("strategyId")
if not isinstance(wdk_id, int):
continue
try:
await api.delete_strategy(wdk_id)
logger.info(
"Deleted leaked internal control-test WDK strategy",
site_id=site_id,
wdk_strategy_id=wdk_id,
)
except Exception as e:
logger.warning(
"Failed to delete leaked internal control-test strategy",
site_id=site_id,
wdk_strategy_id=wdk_id,
error=str(e),
)