Source code for veupath_chatbot.services.strategies.engine.step_builder
"""Step creation, parameter assembly, and vocabulary helpers."""
from veupath_chatbot.domain.parameters.vocab_utils import (
flatten_vocab,
normalize_vocab_key,
)
from veupath_chatbot.platform.types import JSONArray, JSONObject, JSONValue
from .base import StrategyToolsBase
[docs]
class StepBuilderMixin(StrategyToolsBase):
def _filter_search_options(
self, searches: JSONArray, query: str, limit: int = 20
) -> list[str]:
lowered = query.lower()
results: list[str] = []
for search in searches:
if not isinstance(search, dict):
continue
name_raw = search.get("name") or search.get("urlSegment")
name = name_raw if isinstance(name_raw, str) else ""
display_raw = search.get("displayName")
display = display_raw if isinstance(display_raw, str) else ""
name_lower = name.lower() if isinstance(name, str) else ""
display_lower = display.lower() if isinstance(display, str) else ""
if lowered in name_lower or lowered in display_lower:
result_value = name or display
if result_value:
results.append(result_value)
if len(results) >= limit:
break
return results
def _extract_vocab_options(
self, vocabulary: JSONObject, limit: int = 50
) -> list[str]:
options: list[str] = []
def walk(node: JSONObject) -> None:
if len(options) >= limit:
return
data_raw = node.get("data")
data = data_raw if isinstance(data_raw, dict) else {}
display_raw = data.get("display")
display = display_raw if isinstance(display_raw, str) else None
if display and display != "@@fake@@":
options.append(display)
children_raw = node.get("children")
children = children_raw if isinstance(children_raw, list) else []
for child in children:
if isinstance(child, dict):
walk(child)
if vocabulary:
walk(vocabulary)
return options
def _match_vocab_value(
self, vocabulary: JSONObject | JSONArray, value: JSONValue
) -> str:
if value is None:
return ""
target = str(value)
if not target or not vocabulary:
return target
entries = flatten_vocab(vocabulary, prefer_term=False)
for entry in entries:
display = entry.get("display")
raw_value = entry.get("value")
if target == display:
return raw_value or display or target
if target == raw_value:
return raw_value or target
normalized_target = normalize_vocab_key(target)
for entry in entries:
display = entry.get("display")
raw_value = entry.get("value")
if display and normalize_vocab_key(display) == normalized_target:
return raw_value or display
if raw_value and normalize_vocab_key(raw_value) == normalized_target:
return raw_value
return target
def _get_vocab_node_value(self, node: JSONObject) -> str:
data_raw = node.get("data")
data = data_raw if isinstance(data_raw, dict) else {}
value_raw = data.get("value")
id_raw = data.get("id")
term_raw = data.get("term")
name_raw = data.get("name")
display_raw = data.get("display")
raw_value: str | None = None
if isinstance(value_raw, str):
raw_value = value_raw
elif isinstance(id_raw, str):
raw_value = id_raw
elif isinstance(term_raw, str):
raw_value = term_raw
elif isinstance(name_raw, str):
raw_value = name_raw
elif isinstance(display_raw, str):
raw_value = display_raw
return raw_value if raw_value is not None else ""
def _find_vocab_node_for_match(
self, node: JSONObject, match: str
) -> JSONObject | None:
data_raw = node.get("data")
data = data_raw if isinstance(data_raw, dict) else {}
value_raw = data.get("value")
id_raw = data.get("id")
term_raw = data.get("term")
name_raw = data.get("name")
display_raw = data.get("display")
candidates: list[JSONValue] = []
if value_raw is not None:
candidates.append(value_raw)
if id_raw is not None:
candidates.append(id_raw)
if term_raw is not None:
candidates.append(term_raw)
if name_raw is not None:
candidates.append(name_raw)
if display_raw is not None:
candidates.append(display_raw)
for candidate in candidates:
if match == str(candidate):
return node
normalized = normalize_vocab_key(match)
for candidate in candidates:
if normalize_vocab_key(str(candidate)) == normalized:
return node
children_raw = node.get("children")
children = children_raw if isinstance(children_raw, list) else []
for child in children:
if isinstance(child, dict):
found = self._find_vocab_node_for_match(child, match)
if found:
return found
return None
def _collect_leaf_terms(self, node: JSONObject) -> list[str]:
children_raw = node.get("children")
children = children_raw if isinstance(children_raw, list) else []
if not children:
value = self._get_vocab_node_value(node)
return [value] if value else []
leaves: list[str] = []
for child in children:
if isinstance(child, dict):
leaves.extend(self._collect_leaf_terms(child))
return leaves
def _expand_leaf_values(
self,
vocabulary: JSONObject,
values: list[str],
include_parent: bool = False,
) -> list[str]:
expanded: list[str] = []
seen: set[str] = set()
for value in values:
match = str(value)
if not match:
continue
node = self._find_vocab_node_for_match(vocabulary, match)
if not node:
if match not in seen:
seen.add(match)
expanded.append(match)
continue
if include_parent:
parent_value = self._get_vocab_node_value(node)
if parent_value and parent_value not in seen:
seen.add(parent_value)
expanded.append(parent_value)
for leaf in self._collect_leaf_terms(node):
if leaf and leaf not in seen:
seen.add(leaf)
expanded.append(leaf)
return expanded