Source code for veupath_chatbot.services.strategies.session_factory
"""Helpers for hydrating in-memory strategy session context for agents."""
from shared_py.defaults import DEFAULT_STREAM_NAME
from veupath_chatbot.domain.strategy.ast import from_dict
from veupath_chatbot.domain.strategy.session import (
StrategyGraph,
StrategySession,
hydrate_graph_from_steps_data,
)
from veupath_chatbot.platform.logging import get_logger
from veupath_chatbot.platform.types import JSONObject
logger = get_logger(__name__)
[docs]
def build_strategy_session(
*,
site_id: str,
strategy_graph: JSONObject | None,
) -> StrategySession:
"""Build a StrategySession from a persisted strategy graph payload.
This mirrors UI persistence semantics: prefer canonical ``plan`` if
present/parseable, fall back to snapshot-derived ``steps`` + ``rootStepId``
hydration when plan is missing/invalid.
:param site_id: VEuPathDB site identifier.
:param strategy_graph: JSONObject | None.
"""
session = StrategySession(site_id)
if strategy_graph:
id_value = strategy_graph.get("id")
graph_id_value = strategy_graph.get("graphId")
graph_id_str: str | None = None
if isinstance(id_value, str):
graph_id_str = id_value
elif isinstance(graph_id_value, str):
graph_id_str = graph_id_value
graph_id = graph_id_str or "unknown"
name_value = strategy_graph.get("name")
name = str(name_value) if isinstance(name_value, str) else DEFAULT_STREAM_NAME
plan = strategy_graph.get("plan")
graph = StrategyGraph(graph_id, name, site_id)
if plan and isinstance(plan, dict):
try:
strategy = from_dict(plan)
graph.current_strategy = strategy
graph.name = strategy.name or name
graph.steps = {step.id: step for step in strategy.get_all_steps()}
graph.recompute_roots()
graph.last_step_id = strategy.root.id
graph.save_history(f"Loaded graph: {strategy.name or name}")
except Exception as e:
logger.warning(
"Failed to load graph plan",
error=str(e),
graph_id=graph_id,
)
# If plan wasn't available/valid, fall back to persisted steps/
# rootStepId hydration.
if not graph.steps:
steps_data_value = strategy_graph.get("steps")
steps_data = steps_data_value if isinstance(steps_data_value, list) else []
root_step_id_value = strategy_graph.get("rootStepId")
root_step_id_alt = strategy_graph.get("root_step_id")
root_step_id: str | None = None
if isinstance(root_step_id_value, str):
root_step_id = root_step_id_value
elif isinstance(root_step_id_alt, str):
root_step_id = root_step_id_alt
record_type_value = strategy_graph.get("recordType")
record_type_alt = strategy_graph.get("record_type")
record_type: str | None = None
if isinstance(record_type_value, str):
record_type = record_type_value
elif isinstance(record_type_alt, str):
record_type = record_type_alt
try:
hydrate_graph_from_steps_data(
graph,
steps_data,
root_step_id=root_step_id,
record_type=record_type,
)
if graph.steps:
graph.save_history(f"Loaded graph from persisted steps: {name}")
except Exception as e:
logger.warning(
"Failed to hydrate graph from persisted steps",
error=str(e),
graph_id=graph_id,
)
# Restore WDK build state from the persisted projection.
# This runs regardless of whether the graph was loaded from plan
# or from steps hydration, because the plan AST doesn't carry
# WDK step IDs — only the persisted steps snapshot does.
wdk_strategy_id = strategy_graph.get("wdkStrategyId")
if isinstance(wdk_strategy_id, int):
graph.wdk_strategy_id = wdk_strategy_id
steps_data_for_wdk = strategy_graph.get("steps")
if isinstance(steps_data_for_wdk, list):
for step in steps_data_for_wdk:
if not isinstance(step, dict):
continue
sid = step.get("id")
if sid is None:
continue
sid = str(sid)
wdk_step_id = step.get("wdkStepId")
if isinstance(wdk_step_id, int) and sid in graph.steps:
graph.wdk_step_ids[sid] = wdk_step_id
result_count = step.get("resultCount") or step.get("estimatedSize")
if isinstance(result_count, int) and sid in graph.steps:
graph.step_counts[sid] = result_count
session.add_graph(graph)
if not session.get_graph(None):
session.create_graph(DEFAULT_STREAM_NAME)
return session