Chat & Orchestration

Chat event handling, streaming, and orchestration of the agent flow. Coordinates between HTTP, the agent, and the strategy store.

        sequenceDiagram
    participant Client
    participant Router
    participant Orchestrator
    participant Agent
    participant WDK

    Client->>Router: POST /api/v1/chat
    Router->>Orchestrator: start_chat_stream()
    Orchestrator->>Agent: create PathfinderAgent
    loop Streaming
        Agent->>Agent: tool call (e.g. create_step)
        Agent->>WDK: WDK API call
        WDK-->>Agent: result
        Agent-->>Orchestrator: SSE event
        Orchestrator-->>Client: SSE: tool_call_start/end
    end
    Agent-->>Orchestrator: final message
    Orchestrator-->>Client: SSE: message_end
    

Overview

  • Events — Chat event type definitions

  • Orchestrator — Run the agent loop, emit SSE events

  • Streaming — Stream processing and event handling

  • Mention Context — @mention context injection

  • Utils — Shared chat utilities

Chat Events

Purpose: Chat event type definitions for the event bus.

Semantic chat events and tool-result mapping.

Uses a registry of event extractors so new tool result types can be added without modifying the dispatcher function (Open/Closed Principle).

type veupath_chatbot.services.chat.events.GetGraphFn = Callable[[str | None], StrategyGraph | None] | None
class veupath_chatbot.services.chat.events.EventExtractor(*args, **kwargs)[source]

Bases: Protocol

Protocol for event extractor functions.

__init__(*args, **kwargs)
veupath_chatbot.services.chat.events.tool_result_to_events(result, *, get_graph=None)[source]

Convert a tool result dict into a list of semantic chat events.

Each registered extractor is called in order. Extractors that return None are silently skipped.

Return type:

list[JSONObject]

Chat Orchestrator

Purpose: Orchestrate the agent loop. Send user messages to the agent, handle tool calls (create_step, build_strategy, etc.), apply graph snapshots from tool results, emit SSE events (message_start, tool_call_start, etc.). Coordinates between the agent and the strategy store.

Key functions: Main orchestration entry point, tool result handling

Chat orchestration entrypoint (service layer) — CQRS version.

Every event is persisted to Redis the moment it’s emitted. The PostgreSQL projection is updated inline. No accumulation, no finalization step.

AI-layer dependencies (agent factory, model resolver) are injected at startup via configure() — the transport layer calls this once to wire the concrete implementations. The services layer depends only on kani.Kani (third-party base class), never on concrete agent types from veupath_chatbot.ai.

veupath_chatbot.services.chat.orchestrator.configure(*, create_agent_fn, resolve_model_id_fn)[source]

Wire AI-layer implementations into the orchestrator.

Called once at application startup from the composition root.

Parameters

create_agent_fn:

Factory that builds a Kani agent for a chat turn.

resolve_model_id_fn:

Resolves the effective model ID from overrides and persisted state.

async veupath_chatbot.services.chat.orchestrator.start_chat_stream(*, message, site_id, strategy_id, user_id, user_repo, stream_repo, provider_override=None, model_override=None, reasoning_effort=None, mentions=None, disable_rag=False, disabled_tools=None, temperature=None, seed=None, context_size=None, response_tokens=None, reasoning_budget=None)[source]

Start a background chat operation and return its identifiers.

Returns (operation_id, stream_id) so the caller can hand them to the client. The client subscribes to GET /operations/{operation_id}/subscribe for SSE events.

Only fast, essential work runs synchronously (user lookup, stream resolution, operation registration, user_message emission). All heavy lifting is deferred into the background producer.

Return type:

tuple[str, str]

async veupath_chatbot.services.chat.orchestrator.cancel_chat_operation(operation_id)[source]

Cancel a running chat operation.

Returns True if the operation was found and cancelled, False otherwise.

Return type:

bool

Chat Streaming

Purpose: Stream processing for the chat agent. Handles SSE event formatting and stream lifecycle management.

HTTP streaming helpers (SSE event generation) for chat.

async veupath_chatbot.services.chat.streaming.stream_chat(agent, message, *, model_id='')[source]

Stream chat responses from the agent as SSE-friendly events.

NOTE: Does NOT emit message_start — the orchestrator (_chat_producer) emits the rich message_start with strategy context before calling us.

Return type:

AsyncIterator[JSONObject]

Mention Context

Purpose: Build rich context from @-mentions (strategies and experiments). Loads referenced entities and formats them for the agent’s system prompt.

Build rich context blocks for @-mentioned strategies and experiments.

When a user @-mentions a strategy or experiment in chat, we load the full entity and format a human-readable context block that gets appended to the system prompt so the model has complete information from the start.

async veupath_chatbot.services.chat.mention_context.build_mention_context(mentions, stream_repo)[source]

Build concatenated context blocks for all mentions.

Parameters:
  • mentions (list[dict[str, str]]) – List of {"type": ..., "id": ..., "displayName": ...} dicts.

  • stream_repo (StreamRepository) – Repository for loading stream projections.

Returns:

Markdown context string (empty if no mentions resolved).

Return type:

str

Chat Utils

Purpose: Shared utilities for chat processing: node selection parsing, message formatting, and stream helpers.

Small chat utilities (parsing, ids).

veupath_chatbot.services.chat.utils.parse_selected_nodes(message)[source]

Parse the __NODE__{json}n<text> prefix used by the UI.

Parameters:

message (str) – Chat message.

Return type:

tuple[JSONObject | None, str]