Source code for veupath_chatbot.services.strategies.engine.graph_ops

"""Graph traversal, node/edge operations, serialization, and snapshots."""

from typing import cast

from veupath_chatbot.domain.strategy.ast import PlanStepNode, StrategyAST
from veupath_chatbot.domain.strategy.explain import explain_operation
from veupath_chatbot.domain.strategy.session import StrategyGraph
from veupath_chatbot.platform.types import JSONArray, JSONObject, JSONValue

from .base import StrategyToolsBase
from .graph_integrity import find_root_step_ids


def _serialize_step_decorations(step: PlanStepNode, info: JSONObject) -> None:
    """Add filter/analysis/report data to a step dict (if non-empty)."""
    if step.filters:
        info["filters"] = [f.to_dict() for f in step.filters]
    if step.analyses:
        info["analyses"] = [a.to_dict() for a in step.analyses]
    if step.reports:
        info["reports"] = [r.to_dict() for r in step.reports]


[docs] class GraphOpsMixin(StrategyToolsBase): def _derive_strategy_name( self, record_type: str | None, root_step: PlanStepNode, ) -> str: base = None kind = root_step.infer_kind() if kind == "search" or kind == "transform": base = root_step.display_name or root_step.search_name elif kind == "combine": if root_step.operator is not None: base = root_step.display_name or explain_operation(root_step.operator) else: base = root_step.display_name base = (base or "").strip() if not base: base = f"{record_type.title()} strategy" if record_type else "Strategy" if record_type and record_type.lower() not in base.lower(): base = f"{record_type.title()} - {base}" return base[:120] def _derive_strategy_description( self, record_type: str | None, root_step: PlanStepNode, ) -> str: kind = root_step.infer_kind() if kind == "search": summary = root_step.display_name or root_step.search_name verb = "Find" elif kind == "transform": summary = root_step.display_name or root_step.search_name verb = "Transform" else: if root_step.operator is not None: summary = explain_operation(root_step.operator) else: summary = root_step.display_name or "combine" verb = "Combine" summary = (summary or "").strip() if not summary: summary = "results" if record_type: return f"{verb} {record_type} results for {summary}." return f"{verb} results for {summary}." def _build_step_fields( self, graph: StrategyGraph | None, step: PlanStepNode, *, id_key: str = "stepId", always_include_inputs: bool = False, ) -> JSONObject: """Build the common serialized fields for a step node. Shared by ``_serialize_step`` (tool responses) and ``_serialize_graph_step`` (graph snapshots). :param graph: Strategy graph (for WDK IDs / counts), or None. :param step: Step node to serialize. :param id_key: Key name for the step ID ("stepId" or "id"). :param always_include_inputs: If True, always emit primaryInputStepId / secondaryInputStepId (graph-snapshot mode). If False, only emit them when structurally relevant (combine/transform). :returns: Serialized step dict. """ kind = step.infer_kind() info: JSONObject = { id_key: step.id, "kind": kind, "displayName": step.display_name or step.search_name, } # searchName is only meaningful for leaf / transform steps. if kind != "combine": info["searchName"] = step.search_name # Structural relationships. include_primary = kind in ("combine", "transform") or always_include_inputs include_secondary = kind == "combine" or always_include_inputs if kind == "combine": info["operator"] = step.operator.value if step.operator else None if include_primary: info["primaryInputStepId"] = ( step.primary_input.id if step.primary_input else None ) if include_secondary: info["secondaryInputStepId"] = ( step.secondary_input.id if step.secondary_input else None ) # WDK-aligned fields (populated after build_strategy). if graph: wdk_step_id = graph.wdk_step_ids.get(step.id) if wdk_step_id is not None: info["wdkStepId"] = wdk_step_id info["isBuilt"] = wdk_step_id is not None estimated_size = graph.step_counts.get(step.id) if estimated_size is not None: info["estimatedSize"] = estimated_size # Only include heavy fields when non-empty. if step.parameters: info["parameters"] = step.parameters _serialize_step_decorations(step, info) return info def _serialize_step( self, graph: StrategyGraph, step: PlanStepNode, ) -> JSONObject: """Serialize a step with WDK-aligned fields. Includes ``estimatedSize`` and ``wdkStepId`` when available on the graph (populated after ``build_strategy``). Omits noisy fields (parameters, filters, analyses, reports) when empty. :param graph: Strategy graph with WDK IDs and counts. :param step: Step node to serialize. :returns: Serialized step dict. """ return self._build_step_fields(graph, step, id_key="stepId") def _serialize_graph_step(self, step: PlanStepNode) -> JSONObject: """Serialize a step for graph snapshots. Same enrichments as ``_serialize_step`` (WDK IDs, counts) but keyed by ``id`` instead of ``stepId`` for graph-snapshot compatibility. :param step: Step node to serialize. :returns: Serialized step dict keyed by id. """ graph = self._get_graph(None) return self._build_step_fields( graph, step, id_key="id", always_include_inputs=True ) def _build_graph_snapshot(self, graph: StrategyGraph) -> JSONObject: plan_payload = self._build_context_plan(graph) record_type = plan_payload.get("recordType") if plan_payload else None name = plan_payload.get("name") if plan_payload else graph.name description = plan_payload.get("description") if plan_payload else None # rootStepId should only be set when the working graph has exactly # one output (one root). Do not guess based on "last_step_id" when multiple # roots exist, otherwise the UI/agent may incorrectly assume the strategy is done. roots = find_root_step_ids(graph) root_step_id = roots[0] if len(roots) == 1 else None steps = [self._serialize_graph_step(step) for step in graph.steps.values()] edges: JSONArray = [] for step in graph.steps.values(): primary_input = getattr(step, "primary_input", None) if primary_input is not None: edges.append( { "sourceId": primary_input.id, "targetId": step.id, "kind": "primary", } ) secondary_input = getattr(step, "secondary_input", None) if secondary_input is not None: edges.append( { "sourceId": secondary_input.id, "targetId": step.id, "kind": "secondary", } ) return { "graphId": graph.id, "graphName": graph.name, "recordType": record_type, "name": name, "description": description, "rootStepId": root_step_id, "steps": cast(JSONValue, steps), "edges": edges, } def _build_context_plan(self, graph: StrategyGraph) -> JSONObject | None: # Prefer the single subtree root from graph.roots; fall back to # last_step_id when roots is ambiguous or not yet populated. if len(graph.roots) == 1: root_id = next(iter(graph.roots)) elif graph.last_step_id: root_id = graph.last_step_id else: return None root_step = graph.get_step(root_id) if not root_step: return None record_type = graph.record_type if not record_type: return None name = graph.current_strategy.name if graph.current_strategy else graph.name description = ( graph.current_strategy.description if graph.current_strategy else None ) if self._is_placeholder_name(name): name = self._derive_strategy_name(record_type, root_step) if not description: description = self._derive_strategy_description(record_type, root_step) strategy = StrategyAST( record_type=record_type, root=root_step, name=name, description=description, ) graph.current_strategy = strategy graph.name = name or graph.name return { "graphId": graph.id, "graphName": graph.name, "plan": strategy.to_dict(), "recordType": record_type, "name": name, "description": description, } def _step_ok_response(self, graph: StrategyGraph, step: PlanStepNode) -> JSONObject: """Serialize a step as an ``ok=True`` response with a full graph snapshot. This combines the three-step pattern used after successful step mutations: serialize the step, mark ok, wrap with graph context. """ response = self._serialize_step(graph, step) response["ok"] = True return self._with_full_graph(graph, response) def _with_plan_payload( self, graph: StrategyGraph, payload: JSONObject ) -> JSONObject: plan_payload = self._build_context_plan(graph) if plan_payload: payload.update(plan_payload) else: payload.setdefault("graphId", graph.id) payload.setdefault("graphName", graph.name) return payload def _with_full_graph(self, graph: StrategyGraph, payload: JSONObject) -> JSONObject: response = self._with_plan_payload(graph, payload) response["graphSnapshot"] = self._build_graph_snapshot(graph) return response