Source code for veupath_chatbot.ai.tools.result_tools
"""Tools for retrieving step results (sample records, download URLs)."""
from typing import Annotated, cast
from kani import AIParam, ai_function
from veupath_chatbot.ai.tools.wdk_error_handler import handle_wdk_step_error
from veupath_chatbot.domain.strategy.session import StrategySession
from veupath_chatbot.platform.errors import ErrorCode, WDKError
from veupath_chatbot.platform.tool_errors import tool_error
from veupath_chatbot.platform.types import JSONObject, JSONValue
from veupath_chatbot.services.wdk import (
StrategyAPI,
TemporaryResultsAPI,
get_results_api,
get_strategy_api,
)
[docs]
class ResultTools:
"""Tools for fetching step results from VEuPathDB."""
[docs]
def __init__(
self,
session: StrategySession,
strategy_api: StrategyAPI | None = None,
results_api: TemporaryResultsAPI | None = None,
) -> None:
self.session = session
self.strategy_api = strategy_api
self.results_api = results_api
[docs]
@ai_function()
async def get_download_url(
self,
wdk_step_id: Annotated[int, AIParam(desc="WDK step ID")],
format: Annotated[
str,
AIParam(desc="Download format: csv, tab, or json"),
] = "csv",
attributes: Annotated[
list[str] | None,
AIParam(desc="Specific attributes to include"),
] = None,
) -> JSONObject:
"""Get a download URL for step results.
The URL can be used to download results in the specified format.
"""
if not isinstance(wdk_step_id, int) or wdk_step_id <= 0:
return tool_error(
ErrorCode.VALIDATION_ERROR,
"wdk_step_id must be a positive integer.",
wdk_step_id=wdk_step_id,
expected="positive integer",
)
if format not in {"csv", "tab", "json"}:
return tool_error(
ErrorCode.VALIDATION_ERROR,
"format must be one of: csv, tab, json.",
format=format,
allowed=["csv", "tab", "json"],
)
if attributes is not None:
if len(attributes) == 0:
return tool_error(
ErrorCode.VALIDATION_ERROR,
"attributes cannot be an empty list when provided.",
attributes=cast(JSONValue, attributes),
)
bad_attrs = [
a for a in attributes if not isinstance(a, str) or not a.strip()
]
if bad_attrs:
return tool_error(
ErrorCode.VALIDATION_ERROR,
"attributes must contain non-empty strings.",
invalidAttributes=cast(JSONValue, bad_attrs),
)
try:
results_api = self.results_api or get_results_api(self.session.site_id)
url = await results_api.get_download_url(
step_id=wdk_step_id,
format=format,
attributes=attributes,
)
if not isinstance(url, str) or not url:
return tool_error(
ErrorCode.WDK_ERROR,
"VEuPathDB did not provide a usable download URL for this step. "
"This usually means the temporary result is still being prepared "
"or the upstream payload shape changed.",
wdk_step_id=wdk_step_id,
format=format,
)
return {
"downloadUrl": url,
"format": format,
"stepId": wdk_step_id,
}
except WDKError as e:
return handle_wdk_step_error(
e,
wdk_step_id=wdk_step_id,
action="download",
fallback_message="generating the download URL",
)
except Exception as e:
return tool_error(
ErrorCode.WDK_ERROR,
"Failed to generate download URL from VEuPathDB.",
wdk_step_id=wdk_step_id,
detail=str(e),
)
[docs]
@ai_function()
async def get_sample_records(
self,
wdk_step_id: Annotated[int, AIParam(desc="WDK step ID")],
limit: Annotated[int, AIParam(desc="Number of records")] = 5,
) -> JSONObject:
"""Get a sample of records from an executed step.
Returns the first N records to show the user what data is available.
"""
if not isinstance(wdk_step_id, int) or wdk_step_id <= 0:
return tool_error(
ErrorCode.VALIDATION_ERROR,
"wdk_step_id must be a positive integer.",
wdk_step_id=wdk_step_id,
expected="positive integer",
)
if not isinstance(limit, int) or limit < 1 or limit > 500:
return tool_error(
ErrorCode.VALIDATION_ERROR,
"limit must be an integer between 1 and 500.",
limit=limit,
min=1,
max=500,
)
try:
strategy_api = self.strategy_api or get_strategy_api(self.session.site_id)
preview_raw = await strategy_api.get_step_answer(
step_id=wdk_step_id,
pagination={"offset": 0, "numRecords": limit},
)
if not isinstance(preview_raw, dict):
raise TypeError("Expected dict from get_step_preview")
preview: dict[str, JSONValue] = {str(k): v for k, v in preview_raw.items()}
records_raw = preview.get("records", [])
records: list[JSONValue] = (
records_raw if isinstance(records_raw, list) else []
)
meta_raw = preview.get("meta", {})
meta: dict[str, JSONValue] = meta_raw if isinstance(meta_raw, dict) else {}
total_count_raw = meta.get("totalCount", 0)
total_count: int = (
total_count_raw if isinstance(total_count_raw, int) else 0
)
attributes_list: list[str] = []
if records and isinstance(records[0], dict):
attributes_list = [str(k) for k in records[0]]
attributes: JSONValue = cast(JSONValue, attributes_list)
return {
"records": records,
"totalCount": total_count,
"attributes": attributes,
}
except WDKError as e:
return handle_wdk_step_error(
e,
wdk_step_id=wdk_step_id,
action="read",
fallback_message="reading step records",
)
except Exception as e:
return tool_error(
ErrorCode.WDK_ERROR,
"Failed to fetch sample records from VEuPathDB.",
wdk_step_id=wdk_step_id,
detail=str(e),
)