Source code for veupath_chatbot.integrations.veupathdb.strategy_api.analyses

"""Step analysis lifecycle for the Strategy API.

Provides :class:`AnalysisMixin` with methods to list analysis types,
create, run, poll, and retrieve step analysis results.
"""

import asyncio

from veupath_chatbot.integrations.veupathdb.strategy_api.base import StrategyAPIBase
from veupath_chatbot.platform.errors import InternalError
from veupath_chatbot.platform.logging import get_logger
from veupath_chatbot.platform.types import JSONArray, JSONObject

logger = get_logger(__name__)


[docs] class AnalysisMixin(StrategyAPIBase): """Mixin providing step analysis lifecycle methods.""" _RETRIABLE_STATUSES = frozenset({"ERROR", "OUT_OF_DATE", "STEP_REVISED"})
[docs] async def list_analysis_types(self, step_id: int) -> JSONArray: """List available analysis types for a step.""" await self._ensure_session() return await self.client.list_analysis_types(self.user_id, step_id)
[docs] async def get_analysis_type(self, step_id: int, analysis_type: str) -> JSONObject: """Get analysis form metadata for a step.""" await self._ensure_session() return await self.client.get_analysis_type(self.user_id, step_id, analysis_type)
[docs] async def list_step_analyses(self, step_id: int) -> JSONArray: """List analyses that have been run on a step.""" await self._ensure_session() return await self.client.list_step_analyses(self.user_id, step_id)
async def _warmup_step(self, step_id: int) -> None: """Warm up the step answer before running an analysis. Boolean/combined steps need their answer materialized before WDK will run analyses. A zero-record standard report forces WDK to compute and cache the answer for the step (and all sub-steps). """ logger.info("Warming up step answer", step_id=step_id) warmup = await self._standard_report( step_id, {"pagination": {"offset": 0, "numRecords": 0}} ) warmup_count = None meta = warmup.get("meta") if isinstance(meta, dict): warmup_count = meta.get("totalCount") logger.info("Step answer warmed up", step_id=step_id, total_count=warmup_count) async def _create_analysis( self, step_id: int, analysis_type: str, parameters: JSONObject | None = None, custom_name: str | None = None, ) -> int: """Create a step analysis instance and return its ID. :param step_id: WDK step ID. :param analysis_type: Analysis plugin name (e.g. ``go-enrichment``). :param parameters: Analysis parameters. :param custom_name: Optional display name. :returns: The ``analysisId`` from WDK. :raises InternalError: If the response lacks an ``analysisId``. """ payload: JSONObject = { "analysisName": analysis_type, "parameters": parameters or {}, } if custom_name: payload["displayName"] = custom_name instance = await self.client.create_step_analysis( self.user_id, step_id, payload ) analysis_id = instance.get("analysisId") if isinstance(instance, dict) else None if not isinstance(analysis_id, int): raise InternalError( title="Step analysis creation failed", detail=f"No analysisId in response: {instance!r}", ) logger.info( "Created step analysis instance", step_id=step_id, analysis_type=analysis_type, analysis_id=analysis_id, ) return analysis_id async def _poll_analysis( self, step_id: int, analysis_id: int, poll_interval: float, max_wait: float, max_retries: int, ) -> None: """Poll an analysis instance until completion, retrying on transient errors. :param step_id: WDK step ID. :param analysis_id: Analysis instance ID. :param poll_interval: Seconds between status polls. :param max_wait: Maximum seconds to wait before giving up. :param max_retries: Maximum re-run attempts for retriable statuses. :raises InternalError: If the analysis fails, expires, or times out. """ elapsed = 0.0 retries = 0 while elapsed < max_wait: await asyncio.sleep(poll_interval) elapsed += poll_interval status_resp = await self.client.get_analysis_status( self.user_id, step_id, analysis_id ) status = ( str(status_resp.get("status", "")) if isinstance(status_resp, dict) else "" ) logger.debug( "Analysis status poll", analysis_id=analysis_id, status=status, elapsed=elapsed, ) if status == "COMPLETE": return if status in ("EXPIRED", "INTERRUPTED"): raise InternalError( title="Step analysis failed", detail=f"Analysis {analysis_id} ended with status: {status}", ) if status in self._RETRIABLE_STATUSES: retries += 1 logger.warning( "Analysis returned retriable status", analysis_id=analysis_id, status=status, status_response=status_resp, retry=retries, ) if retries > max_retries: self._log_analysis_failure(step_id, analysis_id) raise InternalError( title="Analysis unavailable", detail=( f"VEuPathDB could not complete this analysis " f"(returned {status} after {retries} attempts). " f"This typically happens when the gene set is too " f"small or lacks the required annotations." ), ) # WDK's requiresRerun flag means re-running the same instance # resets it to PENDING and re-executes automatically. logger.warning( "Re-running same analysis instance", analysis_id=analysis_id, status=status, retry=retries, ) await self.client.run_analysis_instance( self.user_id, step_id, analysis_id ) raise InternalError( title="Step analysis timed out", detail=f"Analysis {analysis_id} did not complete within {max_wait}s", ) def _log_analysis_failure(self, step_id: int, analysis_id: int) -> None: """Best-effort logging of analysis failure details. Fires off async tasks to fetch the analyses list and error result for debugging. Exceptions are caught and logged rather than propagated. """ async def _fetch_debug_info() -> None: try: analyses = await self.client.list_step_analyses(self.user_id, step_id) logger.error( "Step analyses list on failure", step_id=step_id, analysis_id=analysis_id, analyses=analyses, ) except Exception as exc: logger.error( "Could not list step analyses", error=str(exc), ) try: err_result = await self.client.get_analysis_result( self.user_id, step_id, analysis_id ) logger.error( "Analysis error result", analysis_id=analysis_id, error_result=err_result, ) except Exception as exc: logger.error( "Could not fetch analysis result", analysis_id=analysis_id, error=str(exc), ) # Schedule but don't await -- fire and forget for debugging asyncio.ensure_future(_fetch_debug_info())
[docs] async def run_step_analysis( self, step_id: int, analysis_type: str, parameters: JSONObject | None = None, custom_name: str | None = None, poll_interval: float = 2.0, max_wait: float = 300.0, max_retries: int = 3, ) -> JSONObject: """Create, run, and wait for a WDK step analysis to complete. WDK step analysis is a multi-phase process: 1. ``POST .../analyses`` -- create instance (returns ``analysisId``) 2. ``POST .../analyses/{id}/result`` -- kick off execution 3. ``GET .../analyses/{id}/result/status`` -- poll until COMPLETE 4. ``GET .../analyses/{id}/result`` -- retrieve results Boolean/combined steps may return ``ERROR`` on the first run because sub-step answers haven't been computed yet. Per the WDK source (``ExecutionStatus.requiresRerun``), the correct strategy is to re-run the **same** instance -- the WDK backend resets to ``PENDING`` and re-executes. :param step_id: WDK step ID (must be part of a strategy). :param analysis_type: Analysis plugin name (e.g. ``go-enrichment``). :param parameters: Analysis parameters. :param custom_name: Optional display name. :param poll_interval: Seconds between status polls. :param max_wait: Maximum seconds to wait before giving up. :param max_retries: Maximum re-run attempts for retriable statuses. :returns: Analysis result JSON. :raises InternalError: If the analysis fails or times out. """ await self._ensure_session() # Phase 0: Warm up step answer await self._warmup_step(step_id) # Phase 1: Create analysis instance analysis_id = await self._create_analysis( step_id, analysis_type, parameters, custom_name ) # Phase 2: Kick off execution await self.client.run_analysis_instance(self.user_id, step_id, analysis_id) # Phase 3: Poll for completion (raises on failure/timeout) await self._poll_analysis( step_id, analysis_id, poll_interval, max_wait, max_retries ) # Phase 4: Retrieve results result = await self.client.get_analysis_result( self.user_id, step_id, analysis_id ) return result if isinstance(result, dict) else {}