"""Step creation methods for the Strategy API.
Provides :class:`StepsMixin` with methods to create search steps,
combined (boolean) steps, transform steps, and datasets.
"""
from typing import cast
import httpx
from veupath_chatbot.domain.parameters.specs import unwrap_search_data
from veupath_chatbot.integrations.veupathdb.param_utils import wdk_entity_name
from veupath_chatbot.integrations.veupathdb.strategy_api.base import StrategyAPIBase
from veupath_chatbot.platform.errors import InternalError
from veupath_chatbot.platform.logging import get_logger
from veupath_chatbot.platform.types import JSONObject
logger = get_logger(__name__)
[docs]
class StepsMixin(StrategyAPIBase):
"""Mixin providing step creation and dataset upload methods."""
async def _get_boolean_search_name(self, record_type: str) -> str:
"""Resolve the boolean combine search name for a record type."""
if record_type in self._boolean_search_cache:
return self._boolean_search_cache[record_type]
searches = await self.client.get_searches(record_type)
for search in searches:
name = wdk_entity_name(search)
if name.startswith("boolean_question"):
self._boolean_search_cache[record_type] = name
return name
raise ValueError(
f"No boolean combine search found for record type '{record_type}'"
)
async def _get_boolean_param_names(self, record_type: str) -> tuple[str, str, str]:
"""Resolve parameter names for boolean combine search."""
boolean_search = await self._get_boolean_search_name(record_type)
details = await self.client.get_search_details(record_type, boolean_search)
# WDK wraps search details under JsonKeys.SEARCH_DATA = "searchData".
search_data = unwrap_search_data(details) or details
# WDK emits JsonKeys.PARAM_NAMES = "paramNames" — a list of param name strings.
param_names_raw = search_data.get("paramNames")
if not isinstance(param_names_raw, list):
raise ValueError(
f"Boolean search '{boolean_search}' has no 'paramNames' list"
)
param_names = [str(p) for p in param_names_raw if p is not None]
left = next((p for p in param_names if p.startswith("bq_left_op")), None)
right = next((p for p in param_names if p.startswith("bq_right_op")), None)
op = next((p for p in param_names if p.startswith("bq_operator")), None)
if not left or not right or not op:
raise ValueError(
f"Boolean param names not found for record type '{record_type}' "
f"(left={left}, right={right}, op={op}, params={param_names})"
)
return left, right, op
async def _get_answer_param_names(
self,
record_type: str,
search_name: str,
) -> set[str]:
"""Return the set of ``input-step`` (AnswerParam) names for a search.
Results are cached per ``record_type/search_name`` pair.
:param record_type: WDK record type.
:param search_name: Search/question URL segment.
:returns: Set of parameter names whose type is ``input-step``.
"""
cache_key = f"{record_type}/{search_name}"
if cache_key in self._answer_param_cache:
return self._answer_param_cache[cache_key]
try:
details = await self.client.get_search_details(record_type, search_name)
search_data = unwrap_search_data(details)
if not isinstance(search_data, dict):
return set()
params = search_data.get("parameters", [])
if not isinstance(params, list):
return set()
names = {
str(p["name"])
for p in params
if isinstance(p, dict) and p.get("type") == "input-step"
}
self._answer_param_cache[cache_key] = names
return names
except httpx.HTTPError, KeyError, TypeError:
logger.warning(
"Failed to fetch answer param names for %s/%s",
record_type,
search_name,
exc_info=True,
)
return set()
[docs]
async def create_dataset(self, ids: list[str]) -> int:
"""Upload an ID list as a WDK dataset and return the dataset ID.
WDK DatasetParam parameters (type ``input-dataset``) expect an integer
dataset ID, not raw IDs. This method creates a transient dataset via
``POST /users/{userId}/datasets`` and returns the integer ID that can
be used as the parameter value.
:param ids: List of record IDs (e.g. gene locus tags).
:returns: Integer dataset ID.
:raises InternalError: If dataset creation fails or no ID is returned.
"""
await self._ensure_session()
payload: JSONObject = cast(
JSONObject,
{"sourceType": "idList", "sourceContent": {"ids": ids}},
)
result = await self.client.post(
f"/users/{self.user_id}/datasets",
json=payload,
)
if isinstance(result, dict):
ds_id = result.get("id")
if isinstance(ds_id, int):
logger.info(
"Created WDK dataset",
dataset_id=ds_id,
id_count=len(ids),
)
return ds_id
raise InternalError(
title="Dataset creation failed",
detail=f"WDK returned unexpected response: {result!r}",
)
[docs]
async def create_step(
self,
record_type: str,
search_name: str,
parameters: JSONObject,
custom_name: str | None = None,
wdk_weight: int | None = None,
) -> JSONObject:
"""Create an unattached step.
:param record_type: Record type (e.g., "gene", "transcript").
:param search_name: Name of the search question.
:param parameters: Search parameters.
:param custom_name: Optional custom name for the step.
:param wdk_weight: Optional WDK weight for result ranking in combined strategies.
:returns: Created step data with stepId.
"""
normalized_params = self._normalize_parameters(parameters)
# Expand group codes in profile_pattern for GenesByOrthologPattern.
if (
search_name == "GenesByOrthologPattern"
and "profile_pattern" in normalized_params
):
normalized_params[
"profile_pattern"
] = await self._expand_profile_pattern_groups(
record_type,
normalized_params["profile_pattern"],
)
# Expand parent tree nodes to leaves for multi-pick-vocabulary params
# with countOnlyLeaves=true (e.g., organism). WDK silently returns 0
# genes for parent nodes — the frontend's CheckboxTree auto-selects
# leaf descendants, and we must replicate that behaviour.
normalized_params = await self._expand_tree_params_to_leaves(
record_type,
search_name,
normalized_params,
)
search_config: JSONObject = {
"parameters": cast(JSONObject, normalized_params),
}
if wdk_weight is not None:
search_config["wdkWeight"] = wdk_weight
payload: JSONObject = {
"searchName": search_name,
"searchConfig": search_config,
}
if custom_name:
payload["customName"] = custom_name
logger.info(
"Creating WDK step",
record_type=record_type,
search_name=search_name,
)
await self._ensure_session()
return cast(
JSONObject,
await self.client.post(
f"/users/{self.user_id}/steps",
json=payload,
),
)
[docs]
async def create_combined_step(
self,
primary_step_id: int,
secondary_step_id: int,
boolean_operator: str,
record_type: str,
custom_name: str | None = None,
wdk_weight: int | None = None,
) -> JSONObject:
"""Create a combined step (boolean operation).
:param primary_step_id: ID of the primary (left) step.
:param secondary_step_id: ID of the secondary (right) step.
:param boolean_operator: One of INTERSECT, UNION, MINUS, RMINUS, LONLY, RONLY.
:param record_type: WDK record type.
:param custom_name: Optional custom name.
:param wdk_weight: Optional WDK weight for result ranking in combined strategies.
:returns: Created step data.
"""
await self._ensure_session()
boolean_search = await self._get_boolean_search_name(record_type)
left_param, right_param, op_param = await self._get_boolean_param_names(
record_type
)
search_config: JSONObject = {
"parameters": {
# WDK requires empty inputs here; inputs are wired via stepTree
left_param: "",
right_param: "",
op_param: boolean_operator,
},
}
if wdk_weight is not None:
search_config["wdkWeight"] = wdk_weight
payload: JSONObject = {
"searchName": boolean_search,
"searchConfig": search_config,
}
if custom_name:
payload["customName"] = custom_name
logger.info(
"Creating combined step",
primary=primary_step_id,
secondary=secondary_step_id,
operator=boolean_operator,
)
return cast(
JSONObject,
await self.client.post(
f"/users/{self.user_id}/steps",
json=payload,
),
)