Source code for veupath_chatbot.services.experiment.helpers

"""Shared helpers for experiment execution and analysis.

Provides gene-list extraction utilities and the progress callback type alias.
"""

from collections.abc import Awaitable, Callable

from veupath_chatbot.platform.logging import get_logger
from veupath_chatbot.platform.types import JSONObject
from veupath_chatbot.services.experiment.types import GeneInfo

logger = get_logger(__name__)

ProgressCallback = Callable[[JSONObject], Awaitable[None]]
"""Emits an SSE-friendly progress event dict."""


[docs] def safe_int(val: object, default: int = 0) -> int: """Safely convert a value to int, returning *default* on failure.""" if isinstance(val, int): return val if isinstance(val, (float, str)): try: return int(float(val)) except ValueError, TypeError, OverflowError: pass return default
[docs] def safe_float(val: object, default: float = 0.0) -> float: """Safely convert a value to float, returning *default* on failure. Non-finite values (``inf``, ``-inf``, ``nan``) are replaced with *default* because they are not JSON-serializable and PostgreSQL rejects them in JSON columns. """ import math result: float if isinstance(val, (int, float)): result = float(val) elif isinstance(val, str): try: result = float(val) except ValueError: return default else: return default if not math.isfinite(result): return default return result
[docs] def extract_wdk_id(payload: object, key: str = "id") -> int | None: """Extract an integer ID from a WDK JSON response. WDK formatters (``StepFormatter``, ``StrategyService``, etc.) emit entity IDs as Java longs (always ``int`` in JSON) under a known key (typically ``"id"`` or ``"strategyId"``). :param payload: WDK response dict. :param key: JSON key containing the integer ID. :returns: The integer ID, or ``None`` if not found. """ if isinstance(payload, dict): raw = payload.get(key) if isinstance(raw, int): return raw return None
[docs] def coerce_step_id(payload: JSONObject | None) -> int: """Extract step ID from a WDK step-creation response. :param payload: WDK step-creation response. :returns: Step ID. :raises ValueError: If step ID not found. """ step_id = extract_wdk_id(payload) if step_id is None: raise ValueError("Failed to extract step ID from WDK response") return step_id
def _extract_gene_list( result: JSONObject, section: str, key: str, *, fallback_from_controls: bool = False, all_controls: list[str] | None = None, hit_ids: set[str] | None = None, ) -> list[GeneInfo]: """Extract a gene ID list from control-test result and wrap as GeneInfo.""" section_data = result.get(section) if not isinstance(section_data, dict): if fallback_from_controls and all_controls and hit_ids is not None: return [GeneInfo(id=g) for g in all_controls if g not in hit_ids] return [] ids_raw = section_data.get(key) if isinstance(ids_raw, list): return [GeneInfo(id=str(g)) for g in ids_raw if g is not None] if fallback_from_controls and all_controls and hit_ids is not None: return [GeneInfo(id=g) for g in all_controls if g not in hit_ids] return [] def _extract_id_set( result: JSONObject, section: str, key: str, ) -> set[str]: """Extract a set of IDs from a control-test result section.""" section_data = result.get(section) if not isinstance(section_data, dict): return set() ids_raw = section_data.get(key) if isinstance(ids_raw, list): return {str(g) for g in ids_raw if g is not None} return set() def _enrich_list( genes: list[GeneInfo], lookup: dict[str, JSONObject], ) -> list[GeneInfo]: """Replace bare GeneInfo objects with enriched versions from *lookup*.""" enriched: list[GeneInfo] = [] for g in genes: meta = lookup.get(g.id) if meta: enriched.append( GeneInfo( id=g.id, name=str(meta.get("geneName", "")) or g.name, organism=str(meta.get("organism", "")) or g.organism, product=str(meta.get("product", "")) or g.product, ) ) else: enriched.append(g) return enriched async def _resolve_gene_lookup( site_id: str, gene_lists: tuple[list[GeneInfo], ...], ) -> dict[str, JSONObject]: """Resolve all unique gene IDs across multiple lists into a lookup dict.""" from veupath_chatbot.services.gene_lookup.wdk import resolve_gene_ids all_ids: list[str] = [] seen: set[str] = set() for gl in gene_lists: for g in gl: if g.id not in seen: all_ids.append(g.id) seen.add(g.id) if not all_ids: return {} resolved = await resolve_gene_ids(site_id=site_id, gene_ids=all_ids) records = resolved.get("records") if not isinstance(records, list): return {} lookup: dict[str, JSONObject] = {} for rec in records: if isinstance(rec, dict): gid = rec.get("geneId") if isinstance(gid, str): lookup[gid] = rec return lookup
[docs] async def extract_and_enrich_genes( *, site_id: str, result: JSONObject, negative_controls: list[str] | None = None, ) -> tuple[list[GeneInfo], list[GeneInfo], list[GeneInfo], list[GeneInfo]]: """Extract gene lists from a control-test result and enrich with WDK metadata. Single entry point that replaces duplicated extract + enrich blocks. :returns: (true_positive, false_negative, false_positive, true_negative) """ tp = _extract_gene_list(result, "positive", "intersectionIds") fn = _extract_gene_list(result, "positive", "missingIdsSample") fp = _extract_gene_list(result, "negative", "intersectionIds") tn = _extract_gene_list( result, "negative", "missingIdsSample", fallback_from_controls=True, all_controls=negative_controls, hit_ids=_extract_id_set(result, "negative", "intersectionIds"), ) try: lookup = await _resolve_gene_lookup(site_id, (tp, fn, fp, tn)) except Exception as exc: logger.warning("Gene enrichment failed, returning bare IDs", error=str(exc)) return tp, fn, fp, tn if lookup: tp = _enrich_list(tp, lookup) fn = _enrich_list(fn, lookup) fp = _enrich_list(fp, lookup) tn = _enrich_list(tn, lookup) return tp, fn, fp, tn