Source code for veupath_chatbot.domain.parameters.vocab_utils

"""Vocabulary utilities."""

import math
import re

from veupath_chatbot.platform.errors import ValidationError
from veupath_chatbot.platform.logging import get_logger
from veupath_chatbot.platform.types import JSONArray, JSONObject

logger = get_logger(__name__)


[docs] def numeric_equivalent(a: str | None, b: str | None) -> bool: """Check if two string values represent the same number. Handles precision differences between WDK vocab values (often floats) and imported strategy parameters (often full-precision decimals). """ if not a or not b: return False try: fa = float(str(a).strip()) fb = float(str(b).strip()) except Exception as exc: logger.debug("Failed to compare numeric equivalence", error=str(exc)) return False if not (math.isfinite(fa) and math.isfinite(fb)): return False return math.isclose(fa, fb, rel_tol=1e-9, abs_tol=1e-12)
[docs] def match_vocab_value( *, vocab: JSONObject | JSONArray | None, param_name: str, value: str, ) -> str: """Match a user-supplied value against a vocabulary, returning the canonical form. Tries exact display match, exact value match, then numeric equivalence. Raises ``ValidationError`` if no match is found. """ if not vocab: return value value_norm = value.strip() if isinstance(value, str) else str(value) entries = flatten_vocab(vocab, prefer_term=True) for entry in entries: display = entry.get("display") raw_value = entry.get("value") if value_norm == (display or ""): return raw_value if raw_value is not None else (display or value) if value_norm == (raw_value or ""): return raw_value if raw_value is not None else value if numeric_equivalent(value_norm, display): return raw_value if raw_value is not None else (display or value) if numeric_equivalent(value_norm, raw_value): return raw_value if raw_value is not None else value # Before failing, check for prefix/substring matches to suggest alternatives. suggestions: list[str] = [] value_lower = value_norm.lower() for entry in entries: display = entry.get("display") or "" raw_value = entry.get("value") or "" if ( value_lower in display.lower() or value_lower in raw_value.lower() or display.lower().startswith(value_lower) or raw_value.lower().startswith(value_lower) ): suggestions.append(raw_value or display) if len(suggestions) > 20: suggestions = suggestions[:20] detail = f"Parameter '{param_name}' does not accept '{value}'." if suggestions: detail += f" Did you mean one of: {suggestions}" raise ValidationError( title="Invalid parameter value", detail=detail, errors=[ {"param": param_name, "value": value, "suggestions": ", ".join(suggestions)} ], )
[docs] def normalize_vocab_key(value: str) -> str: return re.sub(r"\s+", " ", value.strip()).lower()
[docs] def flatten_vocab( vocabulary: JSONObject | JSONArray, prefer_term: bool = False ) -> list[dict[str, str | None]]: entries: list[dict[str, str | None]] = [] def choose_value(data: JSONObject) -> str | None: term_raw = data.get("term") value_raw = data.get("value") term = term_raw if isinstance(term_raw, str) else None value = value_raw if isinstance(value_raw, str) else None if prefer_term: return term or value return value or term def walk(node: JSONObject) -> None: data_raw = node.get("data", {}) data = data_raw if isinstance(data_raw, dict) else {} display_raw = data.get("display") display = display_raw if isinstance(display_raw, str) else None raw_value = choose_value(data) children_raw = node.get("children", []) children = [ c for c in (children_raw if isinstance(children_raw, list) else []) if isinstance(c, dict) ] # Only include leaf nodes — parent/group nodes in WDK tree vocabs # are not selectable values (WDK rejects them with 422). if not children: entries.append({"display": display, "value": raw_value}) for child in children: walk(child) if isinstance(vocabulary, dict) and vocabulary: walk(vocabulary) elif isinstance(vocabulary, list): for item in vocabulary: if isinstance(item, list): if not item or item[0] is None: continue value = str(item[0]) display_from_list = ( str(item[1]) if len(item) > 1 and item[1] is not None else value ) entries.append({"display": display_from_list, "value": value}) elif isinstance(item, dict): display_raw = item.get("display") display_from_dict = ( display_raw if isinstance(display_raw, str) else None ) raw_value = choose_value(item) entries.append({"display": display_from_dict, "value": raw_value}) else: entries.append({"display": str(item), "value": str(item)}) return entries
# --------------------------------------------------------------------------- # Tree-vocabulary helpers (for dict/tree-shaped WDK vocabularies) # --------------------------------------------------------------------------- def _get_node_data(node: JSONObject) -> JSONObject: """Extract the ``data`` sub-dict from a vocab tree node.""" raw = node.get("data", {}) return raw if isinstance(raw, dict) else {}
[docs] def get_node_term(node: JSONObject) -> str | None: """Return the ``term`` string from a vocab tree node, or None.""" term = _get_node_data(node).get("term") return str(term) if term is not None else None
[docs] def get_vocab_children(node: JSONObject) -> list[JSONObject]: """Return typed child nodes from a vocab tree node.""" raw = node.get("children", []) if not isinstance(raw, list): return [] return [child for child in raw if isinstance(child, dict)]
[docs] def find_vocab_node(root: JSONObject, match: str) -> JSONObject | None: """Find a node whose ``term`` or ``display`` equals *match* (DFS).""" data = _get_node_data(root) term = data.get("term") display = data.get("display") term_str = str(term) if term is not None else None display_str = str(display) if display is not None else None if match in (term_str, display_str): return root for child in get_vocab_children(root): found = find_vocab_node(child, match) if found: return found return None
[docs] def collect_leaf_terms(node: JSONObject) -> list[str]: """Collect all leaf ``term`` values under *node* (inclusive).""" children = get_vocab_children(node) if not children: term = get_node_term(node) return [term] if term else [] leaves: list[str] = [] for child in children: leaves.extend(collect_leaf_terms(child)) return leaves