Source code for veupath_chatbot.services.experiment.enrichment

"""Enrichment analysis via WDK step analysis API.

Wraps VEuPathDB's native GO, pathway, and word enrichment analyses
that are available through the step analysis endpoint.

Plugin names (from ``stepAnalysisPlugins.xml``):
  - ``go-enrichment``  → GoEnrichmentPlugin
  - ``pathway-enrichment``  → PathwaysEnrichmentPlugin
  - ``word-enrichment``  → WordEnrichmentPlugin

GO enrichment parameters (from ``GoEnrichmentPlugin.java``):
  - ``goAssociationsOntologies``  — "Molecular Function" / etc.
  - ``goEvidenceCodes``  — evidence code filter
  - ``goSubset``  — GO slim subset
  - ``pValueCutoff``  — p-value threshold
  - ``organism``  — organism filter

Parameters are fetched from the WDK analysis form defaults so required
fields like ``organism`` and ``pValueCutoff`` are always populated.
"""

import json
import re

from veupath_chatbot.domain.parameters.specs import unwrap_search_data
from veupath_chatbot.domain.strategy.ast import StepTreeNode
from veupath_chatbot.integrations.veupathdb.factory import get_strategy_api
from veupath_chatbot.integrations.veupathdb.strategy_api import StrategyAPI
from veupath_chatbot.platform.logging import get_logger
from veupath_chatbot.platform.types import JSONObject, JSONValue
from veupath_chatbot.services.control_helpers import delete_temp_strategy
from veupath_chatbot.services.experiment.helpers import (
    coerce_step_id,
    extract_wdk_id,
    safe_float,
    safe_int,
)
from veupath_chatbot.services.experiment.types import (
    EnrichmentAnalysisType,
    EnrichmentResult,
    EnrichmentTerm,
)

logger = get_logger(__name__)

_ANALYSIS_TYPE_MAP: dict[EnrichmentAnalysisType, str] = {
    "go_function": "go-enrichment",
    "go_component": "go-enrichment",
    "go_process": "go-enrichment",
    "pathway": "pathway-enrichment",
    "word": "word-enrichment",
}

_GO_ONTOLOGY_MAP: dict[EnrichmentAnalysisType, str] = {
    "go_function": "Molecular Function",
    "go_component": "Cellular Component",
    "go_process": "Biological Process",
}

_REVERSE_GO_ONTOLOGY: dict[str, EnrichmentAnalysisType] = {
    v: k for k, v in _GO_ONTOLOGY_MAP.items()
}

_WDK_TO_ANALYSIS_TYPE: dict[str, EnrichmentAnalysisType] = {
    "pathway-enrichment": "pathway",
    "word-enrichment": "word",
}

_ENRICHMENT_ANALYSIS_NAMES = frozenset(_ANALYSIS_TYPE_MAP.values())


[docs] def infer_enrichment_type( wdk_analysis_name: str, params: JSONObject, result: JSONValue, ) -> EnrichmentAnalysisType: """Infer the ``EnrichmentAnalysisType`` from a WDK analysis name. For GO enrichment, uses the ``goAssociationsOntologies`` parameter or the ``goOntologies`` field in the result to determine which GO branch. """ if wdk_analysis_name in _WDK_TO_ANALYSIS_TYPE: return _WDK_TO_ANALYSIS_TYPE[wdk_analysis_name] # GO enrichment — determine which ontology ontology = str(params.get("goAssociationsOntologies", "")) # WDK vocab params arrive as JSON array strings, e.g. '["Molecular Function"]'. # Unwrap the first element so it matches _REVERSE_GO_ONTOLOGY keys. if ontology.startswith("["): try: parsed = json.loads(ontology) if isinstance(parsed, list) and parsed: ontology = str(parsed[0]) except json.JSONDecodeError, ValueError: ontology = "" if not ontology and isinstance(result, dict): ontologies = result.get("goOntologies") if isinstance(ontologies, list) and ontologies: ontology = str(ontologies[0]) return _REVERSE_GO_ONTOLOGY.get(ontology, "go_process")
[docs] def is_enrichment_analysis(wdk_analysis_name: str) -> bool: """Return True if the WDK analysis name is an enrichment plugin.""" return wdk_analysis_name in _ENRICHMENT_ANALYSIS_NAMES
[docs] def upsert_enrichment_result( results: list[EnrichmentResult], new: EnrichmentResult, ) -> None: """Replace an existing result of the same ``analysis_type``, or append. Mutates *results* in-place so callers don't accumulate duplicate tabs when the same enrichment analysis is re-run. """ for i, existing in enumerate(results): if existing.analysis_type == new.analysis_type: results[i] = new return results.append(new)
def _extract_result_totals(result: JSONValue) -> tuple[int, int]: """Extract total-analyzed and background-size from a WDK result dict. WDK enrichment plugins use different keys for the same concepts: ``resultSize`` / ``totalResults`` for the gene count, and ``backgroundSize`` / ``bgdSize`` for the background universe. :returns: ``(total_genes_analyzed, background_size)`` """ if not isinstance(result, dict): return 0, 0 total = safe_int(result.get("resultSize", result.get("totalResults", 0))) bg = safe_int(result.get("backgroundSize", result.get("bgdSize", 0))) return total, bg
[docs] def parse_enrichment_from_raw( wdk_analysis_name: str, params: JSONObject, result: JSONValue, ) -> EnrichmentResult: """Parse a raw WDK analysis result into an ``EnrichmentResult``. Used by the generic ``analyses/run`` endpoint to return structured enrichment data instead of raw JSON. """ analysis_type = infer_enrichment_type(wdk_analysis_name, params, result) rows = _extract_analysis_rows(result) terms = _parse_enrichment_terms(rows) total_analyzed, bg_size = _extract_result_totals(result) return EnrichmentResult( analysis_type=analysis_type, terms=terms, total_genes_analyzed=total_analyzed, background_size=bg_size, )
_LINK_COUNT_RE = re.compile(r">(\d+)<") _IDLIST_RE = re.compile(r"idList=([^&'\"]+)") def _parse_result_genes_html(html: str) -> tuple[int, list[str]]: """Extract gene count and gene IDs from a WDK ``resultGenes`` HTML link. WDK enrichment plugins render gene counts as hyperlinks:: <a href='...?param.ds_gene_ids.idList=GENE1,GENE2,...&autoRun=1'>32</a> Returns ``(count, gene_ids)``. """ count = 0 count_m = _LINK_COUNT_RE.search(html) if count_m: count = int(count_m.group(1)) genes: list[str] = [] id_m = _IDLIST_RE.search(html) if id_m: genes = [g.strip() for g in id_m.group(1).split(",") if g.strip()] return count, genes def _parse_enrichment_terms( rows: list[JSONObject], ) -> list[EnrichmentTerm]: """Parse WDK enrichment result rows into structured terms. Handles both the "standard" field names used by some WDK analysis plugins and the field names returned by the GO/pathway/word enrichment plugins (``goId``, ``goTerm``, ``resultGenes`` as HTML, ``bgdGenes``, ``foldEnrich``, etc.). """ terms: list[EnrichmentTerm] = [] for row in rows: if not isinstance(row, dict): continue # Term identifier — plugins use different keys: # GO: goId, Pathway: pathwayId, Word: word (no separate ID) term_id = str( row.get( "ID", row.get( "id", row.get( "goId", row.get("pathwayId", row.get("word", "")), ), ), ) ) # Term name / description — Word enrichment uses "descrip" (truncated). term_name = str( row.get( "Description", row.get( "description", row.get( "goTerm", row.get( "pathwayName", row.get("descrip", row.get("word", "")), ), ), ), ) ) # Gene count + gene list — WDK enrichment returns resultGenes as # an HTML <a> tag embedding the count and gene IDs in the URL. gene_count: int = 0 genes: list[str] = [] result_count_raw = row.get("ResultCount", row.get("resultCount")) if result_count_raw is not None: gene_count = safe_int(result_count_raw) else: result_genes_raw = row.get("resultGenes", "") if isinstance(result_genes_raw, str) and "<" in result_genes_raw: gene_count, genes = _parse_result_genes_html(result_genes_raw) else: gene_count = safe_int(result_genes_raw) # If genes weren't extracted from HTML, try explicit gene list fields if not genes: genes_raw = row.get("ResultIDList", row.get("genes")) if isinstance(genes_raw, str): genes = [g.strip() for g in genes_raw.split(",") if g.strip()] elif isinstance(genes_raw, list): genes = [str(g) for g in genes_raw] bg_count = safe_int( row.get("BgdCount", row.get("bgdCount", row.get("bgdGenes", 0))) ) fold = safe_float( row.get("FoldEnrich", row.get("foldEnrichment", row.get("foldEnrich", 0))) ) odds = safe_float(row.get("OddsRatio", row.get("oddsRatio", 0))) pval = safe_float(row.get("PValue", row.get("pValue", 1.0)), default=1.0) fdr = safe_float( row.get("BenjaminiHochberg", row.get("benjamini", 1.0)), default=1.0 ) bonf = safe_float( row.get("Bonferroni", row.get("bonferroni", 1.0)), default=1.0 ) terms.append( EnrichmentTerm( term_id=term_id, term_name=term_name, gene_count=gene_count, background_count=bg_count, fold_enrichment=fold, odds_ratio=odds, p_value=pval, fdr=fdr, bonferroni=bonf, genes=genes, ) ) return terms # WDK ``EnumParamFormatter.getParamType()`` emits these JSON type strings # for params extending ``AbstractEnumParam`` (``EnumParam``, ``FlatVocabParam``). # These are the only param types whose stable values must be JSON arrays # (via ``AbstractEnumParam.convertToTerms()`` → ``new JSONArray(stableValue)``). # See ``org.gusdb.wdk.core.api.JsonKeys``: # SINGLE_VOCAB_PARAM_TYPE = "single-pick-vocabulary" # MULTI_VOCAB_PARAM_TYPE = "multi-pick-vocabulary" _WDK_VOCAB_PARAM_TYPES = frozenset({"single-pick-vocabulary", "multi-pick-vocabulary"}) def _extract_param_specs(form_meta: JSONValue) -> list[JSONObject]: """Extract the parameters list from WDK form metadata. Handles the ``searchData`` wrapper that WDK uses for analysis-type endpoints and unwraps it if present. """ if not isinstance(form_meta, dict): return [] container = unwrap_search_data(form_meta) or form_meta params_raw = container.get("parameters") if not isinstance(params_raw, list): return [] return [p for p in params_raw if isinstance(p, dict)] def _extract_vocab_values(form_meta: JSONValue, param_name: str) -> list[str]: """Extract the allowed vocabulary values for a parameter from form metadata. WDK vocabulary params include a ``vocabulary`` field — a list of ``[value, display, null]`` triples. Returns the list of ``value`` strings (first element of each triple). Returns an empty list if the parameter is not found or has no vocabulary. """ for p in _extract_param_specs(form_meta): if p.get("name") != param_name: continue vocab = p.get("vocabulary") if not isinstance(vocab, list): return [] return [str(entry[0]) for entry in vocab if isinstance(entry, list) and entry] return [] def _build_param_type_map(form_meta: JSONValue) -> dict[str, str]: """Build a ``{param_name: wdk_type}`` map from form metadata. Used by :func:`encode_vocab_params` to know which params need JSON array encoding after a merge with user-supplied values. """ type_map: dict[str, str] = {} for p in _extract_param_specs(form_meta): name = p.get("name") ptype = p.get("type") if isinstance(name, str) and name and isinstance(ptype, str): type_map[name] = ptype return type_map def _encode_vocab_value(value: str) -> str: """Ensure a vocabulary param value is a JSON array string. ``AbstractEnumParam.convertToTerms()`` calls ``new JSONArray(stableValue)`` — plain strings cause a parse error. Multi-pick values already arrive as JSON arrays from WDK; single-pick values arrive as plain strings and must be wrapped. """ if value.startswith("["): try: json.loads(value) return value except json.JSONDecodeError, ValueError: pass return json.dumps([value])
[docs] def encode_vocab_params( params: JSONObject, form_meta: JSONValue, ) -> JSONObject: """Encode vocabulary param values as JSON arrays using form metadata. WDK's ``AbstractEnumParam.convertToTerms()`` requires all ``single-pick-vocabulary`` and ``multi-pick-vocabulary`` param values to be JSON-encoded arrays. This function ensures that encoding is applied **after** merging defaults with user params, so user-supplied plain strings don't bypass the encoding. Params whose type is not in the form metadata, or whose type is not a vocabulary type, are returned unchanged. """ type_map = _build_param_type_map(form_meta) if not type_map: return params encoded: JSONObject = {} for name, value in params.items(): ptype = type_map.get(name, "") if isinstance(name, str) else "" if ptype in _WDK_VOCAB_PARAM_TYPES and isinstance(value, str): encoded[name] = _encode_vocab_value(value) else: encoded[name] = value return encoded
def _extract_default_params(form_meta: JSONValue) -> JSONObject: """Extract parameter names and default values from WDK analysis form metadata. WDK wraps data under ``searchData`` — the response from ``GET /analysis-types/{name}`` has the structure:: { "searchData": { "parameters": [ {name, initialDisplayValue, ...}, ... ] } } Uses :func:`unwrap_search_data` to normalize the nesting, then extracts ``name``/``initialDisplayValue`` from each parameter spec. WDK's ``ParamFormatter.java`` emits ``initialDisplayValue`` (via ``JsonKeys.INITIAL_DISPLAY_VALUE``) as the stable default value. Vocabulary params (``single-pick-vocabulary``, ``multi-pick-vocabulary``) are encoded as JSON arrays per ``AbstractEnumParam.convertToTerms()``. """ defaults: JSONObject = {} for p in _extract_param_specs(form_meta): name = p.get("name") default = p.get("initialDisplayValue") if not isinstance(name, str) or not name or default is None: continue value = str(default) # Vocab params must be JSON arrays for convertToTerms(). param_type = str(p.get("type", "")) if param_type in _WDK_VOCAB_PARAM_TYPES: value = _encode_vocab_value(value) defaults[name] = value return defaults async def _execute_analysis( api: StrategyAPI, step_id: int, analysis_type: EnrichmentAnalysisType, ) -> EnrichmentResult: """Shared logic: run analysis on a step, parse results, return EnrichmentResult. Fetches the analysis form metadata from WDK to discover correct parameter names and defaults, then overrides only the GO ontology parameter when applicable. """ wdk_analysis_type = _ANALYSIS_TYPE_MAP.get(analysis_type) if not wdk_analysis_type: return EnrichmentResult( analysis_type=analysis_type, terms=[], total_genes_analyzed=0, background_size=0, ) # Fetch form metadata so we use correct parameter names and defaults. analysis_params: JSONObject = {} form_meta: JSONValue = None try: form_meta = await api.get_analysis_type(step_id, wdk_analysis_type) analysis_params = _extract_default_params(form_meta) logger.debug( "Fetched analysis form defaults", analysis_type=wdk_analysis_type, param_names=list(analysis_params.keys()), ) except Exception as exc: logger.warning( "Could not fetch analysis form metadata, using empty params", analysis_type=wdk_analysis_type, step_id=step_id, error=str(exc), ) # For GO enrichment, set the ontology parameter — but only if the # requested ontology is actually available on this site. Different # VEuPathDB sites support different GO ontologies (e.g. ToxoDB lacks # "Biological Process"). if analysis_type in _GO_ONTOLOGY_MAP: requested_ontology = _GO_ONTOLOGY_MAP[analysis_type] available = _extract_vocab_values(form_meta, "goAssociationsOntologies") if available and requested_ontology not in available: logger.info( "GO ontology not available on this site, skipping", analysis_type=analysis_type, requested=requested_ontology, available=available, ) return EnrichmentResult( analysis_type=analysis_type, terms=[], total_genes_analyzed=0, background_size=0, ) analysis_params["goAssociationsOntologies"] = json.dumps([requested_ontology]) logger.info( "Running enrichment analysis", analysis_type=analysis_type, wdk_type=wdk_analysis_type, step_id=step_id, params=analysis_params, ) # Retry on WDK 500s — the step analysis endpoint is flaky under load. import asyncio as _asyncio last_err: Exception | None = None for attempt in range(3): try: result = await api.run_step_analysis( step_id=step_id, analysis_type=wdk_analysis_type, parameters=analysis_params, ) break except Exception as exc: last_err = exc err_str = str(exc) if "500" in err_str or "502" in err_str or "503" in err_str: logger.warning( "WDK enrichment 5xx, retrying", attempt=attempt + 1, analysis_type=wdk_analysis_type, error=err_str, ) await _asyncio.sleep(2**attempt) continue raise else: if last_err is not None: raise last_err raise RuntimeError("Enrichment analysis failed after retries") rows = _extract_analysis_rows(result) terms = _parse_enrichment_terms(rows) total_analyzed, bg_size = _extract_result_totals(result) return EnrichmentResult( analysis_type=analysis_type, terms=terms, total_genes_analyzed=total_analyzed, background_size=bg_size, )
[docs] async def run_enrichment_analysis( *, site_id: str, record_type: str, search_name: str, parameters: JSONObject, analysis_type: EnrichmentAnalysisType, ) -> EnrichmentResult: """Run a single enrichment analysis on a search result set. Creates a temporary WDK strategy, runs the analysis, parses results, and cleans up. """ api = get_strategy_api(site_id) step = await api.create_step( record_type=record_type, search_name=search_name, parameters=parameters or {}, custom_name="Enrichment target", ) step_id = coerce_step_id(step) root = StepTreeNode(step_id) strategy_id: int | None = None try: created = await api.create_strategy( step_tree=root, name="Pathfinder enrichment analysis", description=None, is_internal=True, ) strategy_id = extract_wdk_id(created) return await _execute_analysis(api, step_id, analysis_type) finally: await delete_temp_strategy(api, strategy_id)
[docs] async def run_enrichment_on_step( *, site_id: str, step_id: int, analysis_type: EnrichmentAnalysisType, ) -> EnrichmentResult: """Run enrichment on an already-persisted WDK step. Used for multi-step experiments where the strategy already exists. """ api = get_strategy_api(site_id) return await _execute_analysis(api, step_id, analysis_type)
def _extract_analysis_rows(result: JSONValue) -> list[JSONObject]: """Extract tabular rows from a WDK analysis result. WDK enrichment plugins (GO, pathway, word) return rows under ``resultData``; other plugins may use ``rows``, ``data``, or ``results``. """ if not isinstance(result, dict): return [] rows = result.get( "resultData", result.get("rows", result.get("data", result.get("results", []))), ) if isinstance(rows, list): return [r for r in rows if isinstance(r, dict)] return []