"""HTTP client for VEuPathDB WDK REST API with retries and cookies."""
import asyncio
import json
from collections.abc import Mapping, Sequence
from typing import cast
import httpx
from tenacity import (
RetryError,
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from veupath_chatbot.platform.config import get_settings
from veupath_chatbot.platform.context import veupathdb_auth_token_ctx
from veupath_chatbot.platform.errors import WDKError
from veupath_chatbot.platform.logging import get_logger
from veupath_chatbot.platform.types import JSONArray, JSONObject, JSONValue
logger = get_logger(__name__)
[docs]
def encode_context_param_values_for_wdk(context: JSONObject) -> JSONObject:
"""Encode contextParamValues in the format WDK expects.
Many WDK endpoints expect multi-pick values as JSON-encoded *strings*
(e.g. '["a","b"]'), not arrays.
:param context: Context dict.
:returns: Encoded context suitable for WDK wire format.
"""
encoded: JSONObject = {}
for k, v in (context or {}).items():
if v is None:
continue
if isinstance(v, str):
encoded[k] = v
elif isinstance(v, (list, dict)):
encoded[k] = json.dumps(v)
else:
encoded[k] = str(v)
return encoded
def _convert_params_for_httpx(
params: JSONObject | None,
) -> (
Mapping[
str, str | int | float | bool | None | Sequence[str | int | float | bool | None]
]
| None
):
"""Convert JSONObject params to format httpx expects.
:param params: Optional params dict.
:returns: Mapping suitable for httpx, or None if params is None.
"""
if params is None:
return None
result: dict[
str, str | int | float | bool | None | Sequence[str | int | float | bool | None]
] = {}
for k, v in params.items():
if v is None:
result[k] = None
elif isinstance(v, (str, int, float, bool)):
result[k] = v
elif isinstance(v, list):
# Convert list to sequence of compatible types
converted_list: list[str | int | float | bool | None] = []
for item in v:
if isinstance(item, (str, int, float, bool)) or item is None:
converted_list.append(item)
else:
converted_list.append(str(item))
result[k] = converted_list
else:
# Convert other types to string
result[k] = str(v)
return result
[docs]
class VEuPathDBClient:
"""HTTP client for VEuPathDB WDK REST services."""
[docs]
def __init__(
self,
base_url: str,
timeout: float = 30.0,
auth_token: str | None = None,
*,
max_connections: int = 1000,
max_keepalive_connections: int = 200,
) -> None:
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.auth_token = auth_token
self.max_connections = int(max_connections)
self.max_keepalive_connections = int(max_keepalive_connections)
self._client: httpx.AsyncClient | None = None
self._client_lock = asyncio.Lock()
self._session_initialized = False
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client."""
if self._client is not None and not self._client.is_closed:
return self._client
async with self._client_lock:
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
base_url=self.base_url,
timeout=httpx.Timeout(self.timeout),
follow_redirects=True,
limits=httpx.Limits(
max_connections=max(1, self.max_connections),
max_keepalive_connections=max(
0, self.max_keepalive_connections
),
),
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
)
return self._client
async def _init_wdk_session(self, client: httpx.AsyncClient) -> None:
"""Initialize a server-side WDK session (JSESSIONID).
WDK process queries (e.g. GenesByOrthologPattern) require a Tomcat
``JSESSIONID`` established through the webapp. Without it, process
queries silently return 0 results.
"""
webapp_url = self.base_url.replace("/service", "/app")
try:
await client.get(webapp_url, timeout=10)
logger.debug(
"WDK session initialized",
jsessionid=bool(client.cookies.get("JSESSIONID")),
)
except Exception:
logger.debug("Failed to initialize WDK session (non-fatal)")
[docs]
async def close(self) -> None:
"""Close HTTP client."""
if self._client and not self._client.is_closed:
await self._client.aclose()
self._client = None
@retry(
retry=retry_if_exception_type(
(httpx.TimeoutException, httpx.ConnectError, httpx.HTTPStatusError)
),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
)
async def _request_attempt(
self,
method: str,
path: str,
params: JSONObject | None = None,
json: JSONObject | None = None,
) -> JSONValue:
"""Single HTTP request attempt (tenacity handles retries).
Retries on transient errors: timeouts, connection failures, and
server errors (5xx). Client errors (4xx) are not retried.
"""
client = await self._get_client()
logger.debug(
"VEuPathDB request",
method=method,
path=path,
base_url=self.base_url,
)
try:
settings = get_settings()
auth_token = (
veupathdb_auth_token_ctx.get()
or self.auth_token
or settings.veupathdb_auth_token
)
# WDK authenticates via an ``Authorization`` cookie (not a header).
# Set the cookie on the client instance (not per-request) because
# httpx has deprecated per-request ``cookies=``.
if auth_token:
client.cookies.set("Authorization", auth_token)
# Initialize WDK session on first authenticated request.
if auth_token and not self._session_initialized:
self._session_initialized = True
await self._init_wdk_session(client)
httpx_params = _convert_params_for_httpx(params)
response = await client.request(
method=method,
url=path,
params=httpx_params,
json=json,
)
response.raise_for_status()
if not response.content or not response.text.strip():
return None
result = response.json()
if result is None:
return None
return cast(JSONValue, result)
except httpx.HTTPStatusError as e:
allow = e.response.headers.get("allow") or e.response.headers.get("Allow")
log_fn = logger.warning if e.response.status_code >= 500 else logger.error
log_fn(
"VEuPathDB HTTP error",
method=method,
status_code=e.response.status_code,
path=path,
allow=allow,
response_text=e.response.text[:500],
)
# 5xx: re-raise so tenacity retries (up to 3 attempts).
if e.response.status_code >= 500:
raise
# 4xx: not retryable — convert to domain error immediately.
raise WDKError(
f"{method} {path} -> HTTP {e.response.status_code}: {e.response.text[:200]}",
status=e.response.status_code,
) from e
except httpx.TimeoutException, httpx.ConnectError:
# Let tenacity retry these transient errors.
raise
except httpx.RequestError as e:
logger.error("VEuPathDB request error", error=str(e), path=path)
raise WDKError(f"Request failed: {e}", status=502) from e
async def _request(
self,
method: str,
path: str,
params: JSONObject | None = None,
json: JSONObject | None = None,
) -> JSONValue:
"""Make HTTP request with retry logic.
Wraps :meth:`_request_attempt` and converts tenacity ``RetryError``
(raised when all retry attempts are exhausted) into ``WDKError``
so callers only need to handle domain errors.
"""
try:
return await self._request_attempt(method, path, params=params, json=json)
except RetryError as e:
last = e.last_attempt.exception()
status = 502
if isinstance(last, httpx.HTTPStatusError):
status = last.response.status_code
log_fn = logger.warning if status >= 500 else logger.error
log_fn(
"VEuPathDB request failed after retries",
method=method,
path=path,
error=str(last),
)
raise WDKError(
f"Request failed after retries: {last}", status=status
) from last
[docs]
async def get(self, path: str, params: JSONObject | None = None) -> JSONValue:
"""GET request."""
return await self._request("GET", path, params=params)
[docs]
async def post(
self,
path: str,
json: JSONObject | None = None,
params: JSONObject | None = None,
) -> JSONValue:
"""POST request."""
return await self._request("POST", path, params=params, json=json)
[docs]
async def patch(self, path: str, json: JSONObject | None = None) -> JSONValue:
"""PATCH request."""
return await self._request("PATCH", path, json=json)
[docs]
async def put(self, path: str, json: JSONObject | None = None) -> JSONValue:
"""PUT request."""
return await self._request("PUT", path, json=json)
[docs]
async def delete(self, path: str) -> JSONValue:
"""DELETE request."""
return await self._request("DELETE", path)
[docs]
async def get_record_types(self, expanded: bool = False) -> JSONArray:
"""Get available record types."""
params: JSONObject | None = {"format": "expanded"} if expanded else None
return cast(JSONArray, await self.get("/record-types", params=params))
[docs]
async def get_searches(self, record_type: str) -> JSONArray:
"""Get searches for a record type."""
return cast(JSONArray, await self.get(f"/record-types/{record_type}/searches"))
[docs]
async def get_search_details(
self,
record_type: str,
search_name: str,
expand_params: bool = True,
) -> JSONObject:
"""Get detailed search configuration including parameters."""
params: JSONObject | None = {"expandParams": "true"} if expand_params else None
return cast(
JSONObject,
await self.get(
f"/record-types/{record_type}/searches/{search_name}",
params=params,
),
)
[docs]
async def get_search_details_with_params(
self,
record_type: str,
search_name: str,
context: JSONObject,
expand_params: bool = True,
) -> JSONObject:
"""Get detailed search configuration using provided parameters."""
params: JSONObject | None = {"expandParams": "true"} if expand_params else None
encoded_context = encode_context_param_values_for_wdk(context or {})
return cast(
JSONObject,
await self.post(
f"/record-types/{record_type}/searches/{search_name}",
json={"contextParamValues": encoded_context},
params=params,
),
)
[docs]
async def get_refreshed_dependent_params(
self,
record_type: str,
search_name: str,
param_name: str,
context: JSONObject,
) -> JSONObject:
"""Refresh dependent params using WDK's refreshed-dependent-params endpoint."""
encoded_context = encode_context_param_values_for_wdk(context or {})
return cast(
JSONObject,
await self.post(
f"/record-types/{record_type}/searches/{search_name}/refreshed-dependent-params",
json={
"changedParam": {
"name": param_name,
"value": encoded_context.get(param_name, ""),
},
"contextParamValues": encoded_context,
},
),
)
[docs]
async def run_search_report(
self,
record_type: str,
search_name: str,
search_config: JSONObject,
report_config: JSONObject | None = None,
) -> JSONObject:
"""Run a report on a search without creating a step or strategy.
Uses WDK's anonymous report endpoint:
``POST /record-types/{recordType}/searches/{searchName}/reports/standard``
This is significantly faster than creating steps/strategies because it
requires no user session and can be parallelized.
:param record_type: WDK record type (e.g. ``"transcript"``).
:param search_name: WDK search name (e.g. ``"GenesByTaxon"``).
:param search_config: Search config with ``parameters`` dict.
:param report_config: Report config (pagination, attributes, etc.).
:returns: Standard report response with ``meta.totalCount``.
"""
payload: JSONObject = {
"searchConfig": search_config,
"reportConfig": report_config or {},
}
result = await self.post(
f"/record-types/{record_type}/searches/{search_name}/reports/standard",
json=payload,
)
return result if isinstance(result, dict) else {}
[docs]
async def get_step_view_filters(self, user_id: str, step_id: int) -> JSONArray:
"""Get viewFilters from a step's answerSpec.
WDK stores filters as ``answerSpec.viewFilters`` on the step resource.
There is no dedicated ``/filter`` endpoint.
"""
step = await self.get(f"/users/{user_id}/steps/{step_id}")
if not isinstance(step, dict):
return []
answer_spec = step.get("answerSpec")
if not isinstance(answer_spec, dict):
return []
view_filters = answer_spec.get("viewFilters")
if not isinstance(view_filters, list):
return []
return view_filters
[docs]
async def update_step_view_filters(
self, user_id: str, step_id: int, filters: JSONArray
) -> JSONValue:
"""Update a step's viewFilters via PATCH on the step resource.
WDK manages filters through ``answerSpec.viewFilters``. The PATCH
body is ``{"answerSpec": {"viewFilters": [...]}}``.
"""
return await self.patch(
f"/users/{user_id}/steps/{step_id}",
json={"answerSpec": {"viewFilters": filters}},
)
[docs]
async def list_analysis_types(self, user_id: str, step_id: int) -> JSONArray:
"""List available analysis types for a step."""
return cast(
JSONArray,
await self.get(f"/users/{user_id}/steps/{step_id}/analysis-types"),
)
[docs]
async def get_analysis_type(
self, user_id: str, step_id: int, analysis_type: str
) -> JSONObject:
"""Get analysis form metadata for a specific analysis type."""
return cast(
JSONObject,
await self.get(
f"/users/{user_id}/steps/{step_id}/analysis-types/{analysis_type}"
),
)
[docs]
async def list_step_analyses(self, user_id: str, step_id: int) -> JSONArray:
"""List analyses that have been run on a step."""
return cast(
JSONArray, await self.get(f"/users/{user_id}/steps/{step_id}/analyses")
)
[docs]
async def create_step_analysis(
self, user_id: str, step_id: int, payload: JSONObject
) -> JSONObject:
"""Create a new analysis instance for a step."""
return cast(
JSONObject,
await self.post(f"/users/{user_id}/steps/{step_id}/analyses", json=payload),
)
[docs]
async def run_analysis_instance(
self, user_id: str, step_id: int, analysis_id: int
) -> JSONObject:
"""Kick off execution of a step analysis instance.
WDK step analyses are created first, then explicitly run.
``POST /users/{userId}/steps/{stepId}/analyses/{analysisId}/result``
returns ``{"status": "RUNNING"|...}``.
"""
return cast(
JSONObject,
await self.post(
f"/users/{user_id}/steps/{step_id}/analyses/{analysis_id}/result"
),
)
[docs]
async def get_analysis_status(
self, user_id: str, step_id: int, analysis_id: int
) -> JSONObject:
"""Poll execution status of a step analysis instance.
``GET .../analyses/{analysisId}/result/status`` returns
``{"status": "RUNNING"|"COMPLETE"|"ERROR"|...}``.
"""
return cast(
JSONObject,
await self.get(
f"/users/{user_id}/steps/{step_id}/analyses/{analysis_id}/result/status"
),
)
[docs]
async def get_analysis_result(
self, user_id: str, step_id: int, analysis_id: int
) -> JSONObject:
"""Get the result of a completed step analysis instance.
``GET .../analyses/{analysisId}/result`` returns the analysis result
JSON. Returns 204 No Content if not yet complete.
"""
return cast(
JSONObject,
await self.get(
f"/users/{user_id}/steps/{step_id}/analyses/{analysis_id}/result"
),
)
[docs]
async def run_step_report(
self,
user_id: str,
step_id: int,
report_name: str,
payload: JSONObject | None = None,
) -> JSONValue:
"""Run a report on a step."""
return await self.post(
f"/users/{user_id}/steps/{step_id}/reports/{report_name}",
json=payload or {},
)