"""Semantic chat events and tool-result mapping.
Uses a registry of event extractors so new tool result types can be added
without modifying the dispatcher function (Open/Closed Principle).
"""
from collections.abc import Callable
from typing import Protocol
from veupath_chatbot.domain.strategy.session import StrategyGraph
from veupath_chatbot.platform.types import JSONArray, JSONObject
# ── Event type constants ──────────────────────────────────────────────
GRAPH_SNAPSHOT = "graph_snapshot"
GRAPH_PLAN = "graph_plan"
GRAPH_CLEARED = "graph_cleared"
STRATEGY_LINK = "strategy_link"
STRATEGY_META = "strategy_meta"
STRATEGY_UPDATE = "strategy_update"
PLANNING_ARTIFACT = "planning_artifact"
CITATIONS = "citations"
REASONING = "reasoning"
EXECUTOR_BUILD_REQUEST = "executor_build_request"
WORKBENCH_GENE_SET = "workbench_gene_set"
# ── Type aliases ──────────────────────────────────────────────────────
type GetGraphFn = Callable[[str | None], StrategyGraph | None] | None
# ── Individual extractors ─────────────────────────────────────────────
def _extract_citations(
result: JSONObject, *, get_graph: GetGraphFn = None
) -> JSONObject | None:
citations = result.get("citations")
if isinstance(citations, list) and citations:
return {"type": CITATIONS, "data": {"citations": citations}}
return None
def _extract_planning_artifact(
result: JSONObject, *, get_graph: GetGraphFn = None
) -> JSONObject | None:
artifact = result.get("planningArtifact")
if artifact:
return {"type": PLANNING_ARTIFACT, "data": {"planningArtifact": artifact}}
return None
def _extract_reasoning(
result: JSONObject, *, get_graph: GetGraphFn = None
) -> JSONObject | None:
reasoning = result.get("reasoning")
if isinstance(reasoning, str) and reasoning.strip():
return {"type": REASONING, "data": {"reasoning": reasoning}}
return None
def _extract_conversation_title(
result: JSONObject, *, get_graph: GetGraphFn = None
) -> JSONObject | None:
title = result.get("conversationTitle")
if isinstance(title, str) and title.strip():
return {"type": STRATEGY_META, "data": {"name": title.strip()}}
return None
def _extract_executor_build_request(
result: JSONObject, *, get_graph: GetGraphFn = None
) -> JSONObject | None:
ebr = result.get("executorBuildRequest")
if isinstance(ebr, dict):
return {"type": EXECUTOR_BUILD_REQUEST, "data": {"executorBuildRequest": ebr}}
return None
def _extract_step_update(
result: JSONObject, *, get_graph: GetGraphFn = None
) -> JSONObject | None:
if "stepId" not in result:
return None
graph_id_raw = result.get("graphId")
graph_id = graph_id_raw if isinstance(graph_id_raw, str) else None
graph = get_graph(graph_id) if get_graph else None
all_steps: JSONArray = []
if graph is not None:
all_steps = [
{
"stepId": sid,
"kind": getattr(s, "infer_kind", lambda: None)(),
"displayName": s.display_name,
}
for sid, s in graph.steps.items()
]
return {
"type": STRATEGY_UPDATE,
"data": {"graphId": graph_id, "step": result, "allSteps": all_steps},
}
def _extract_graph_snapshot(
result: JSONObject, *, get_graph: GetGraphFn = None
) -> JSONObject | None:
# Skip extraction when auto-build succeeded — the auto-build hook
# already emitted a graph_snapshot event with WDK step IDs directly
# to the event queue. Extracting the tool result's pre-build snapshot
# here would overwrite it with stale data (isBuilt=false, no wdkStepId).
auto_build = result.get("autoBuild")
if isinstance(auto_build, dict) and auto_build.get("ok") is True:
return None
snapshot = result.get("graphSnapshot")
if not snapshot:
return None
snapshot_graph_id: str | None = None
if isinstance(snapshot, dict):
raw = snapshot.get("graphId")
snapshot_graph_id = raw if isinstance(raw, str) else None
if snapshot_graph_id is None:
raw = result.get("graphId")
snapshot_graph_id = raw if isinstance(raw, str) else None
return {
"type": GRAPH_SNAPSHOT,
"data": {"graphId": snapshot_graph_id, "graphSnapshot": snapshot},
}
def _extract_graph_plan(
result: JSONObject, *, get_graph: GetGraphFn = None
) -> JSONObject | None:
plan = result.get("plan")
if not plan:
return None
return {
"type": GRAPH_PLAN,
"data": {
"graphId": result.get("graphId"),
"plan": plan,
"name": result.get("name"),
"recordType": result.get("recordType"),
"description": result.get("description"),
},
}
def _extract_strategy_meta(
result: JSONObject, *, get_graph: GetGraphFn = None
) -> JSONObject | None:
if not result.get("graphId"):
return None
if (
"name" not in result
and "description" not in result
and "recordType" not in result
):
return None
return {
"type": STRATEGY_META,
"data": {
"graphId": result.get("graphId"),
"name": result.get("name") or result.get("graphName"),
"description": result.get("description"),
"recordType": result.get("recordType"),
"graphName": result.get("graphName"),
},
}
def _extract_cleared(
result: JSONObject, *, get_graph: GetGraphFn = None
) -> JSONObject | None:
if result.get("cleared"):
return {"type": GRAPH_CLEARED, "data": {"graphId": result.get("graphId")}}
return None
def _extract_gene_set_created(
result: JSONObject, *, get_graph: GetGraphFn = None
) -> JSONObject | None:
gene_set = result.get("geneSetCreated")
if isinstance(gene_set, dict):
return {"type": WORKBENCH_GENE_SET, "data": {"geneSet": gene_set}}
return None
def _extract_strategy_link(
result: JSONObject, *, get_graph: GetGraphFn = None
) -> JSONObject | None:
wdk_strategy_id = result.get("wdkStrategyId")
if wdk_strategy_id is None:
return None
return {
"type": STRATEGY_LINK,
"data": {
"graphId": result.get("graphId"),
"wdkStrategyId": wdk_strategy_id,
"wdkUrl": result.get("wdkUrl"),
"name": result.get("name"),
"description": result.get("description"),
"isSaved": result.get("isSaved"),
},
}
# ── Extractor registry ───────────────────────────────────────────────
# Order matters: events are emitted in registration order.
_EXTRACTORS: list[EventExtractor] = [
_extract_citations,
_extract_planning_artifact,
_extract_reasoning,
_extract_conversation_title,
_extract_executor_build_request,
_extract_step_update,
_extract_graph_snapshot,
_extract_graph_plan,
_extract_strategy_meta,
_extract_cleared,
_extract_gene_set_created,
_extract_strategy_link,
]
# ── Public dispatcher ─────────────────────────────────────────────────