VEuPathDB Integrations¶
Client, discovery, and strategy APIs for interacting with the VEuPathDB WDK (Workspace Development Kit) REST API.
flowchart TD
PF["PathFinder Services"] --> F["Factory"]
F -->|site_id| C["WDK Client"]
F -->|site_id| D["Discovery Cache"]
C --> API["VEuPathDB WDK REST API"]
C --> SA["Strategy API"]
SA --> Steps["Steps"]
SA --> Strategies["Strategies"]
SA --> Reports["Reports"]
SA --> Records["Records"]
style PF fill:#2563eb,color:#fff
style API fill:#059669,color:#fff
Overview¶
Strategy API — Create steps, compose step trees, build strategies, run reports. The main interface for WDK strategy operations.
HTTP Client — Low-level GET/POST with auth, retries, and JSON handling.
Discovery — Cached catalog of record types and searches per site.
Factory — Obtain configured clients and discovery services by site.
Design Decisions¶
Cookie-based WDK auth
VEuPathDB’s WDK API uses cookie-based sessions, not
OAuth tokens. The HTTP client manages a JSESSIONID cookie jar per site,
authenticating via the /login endpoint. This mirrors how WDK’s browser-based
UI authenticates, ensuring compatibility with all WDK endpoints.
Factory pattern for multi-site support
PathFinder supports ~15 VEuPathDB
sites (PlasmoDB, TriTrypDB, FungiDB, etc.). The factory creates per-site clients
with the correct base URL, cookie jar, and discovery cache. Services request a
client by site_id rather than managing URLs directly.
Strategy API split
The WDK Strategy API is split into focused submodules (steps, strategies, reports, records, analyses, filters) rather than a monolithic client. Each submodule handles one concern, making it easier to test and modify individual operations without touching the rest.
Embeddings¶
Purpose: OpenAI-compatible embeddings wrapper with batching. Used by the RAG ingestion pipeline and catalog RAG tools.
- class veupath_chatbot.integrations.embeddings.openai_embeddings.OpenAIEmbeddings(model, batch_size=128, base_url=None)[source]¶
Bases:
objectWrapper around OpenAI-compatible embeddings with batching.
Works with OpenAI, Ollama, or any server exposing
/v1/embeddings.- __init__(model, batch_size=128, base_url=None)¶
Strategy API¶
Purpose: Create and manage WDK steps and strategies. Implements the WDK REST
pattern: create unattached steps via POST, compose a tree via stepTree,
then build the strategy.
Key classes and functions:
StrategyAPI— Main API; methods:create_step,create_combined_step,build_strategy,get_step_count,get_step_answerStepTreeNode— Tree node for step composition (defined indomain.strategy.ast, re-exported fromstrategy_api.helpers)is_internal_wdk_strategy_name()— Check if strategy is a Pathfinder helper (inhelpers)strip_internal_wdk_strategy_name()— Remove internal name prefix (inhelpers)
Composed StrategyAPI class.
Aggregates all strategy API mixins into the final StrategyAPI class
that callers instantiate.
- class veupath_chatbot.integrations.veupathdb.strategy_api.api.StrategyAPI(client, user_id='current')[source]¶
Bases:
StepsMixin,StrategiesMixin,ReportsMixin,AnalysisMixin,FilterMixin,RecordsMixinAPI for creating and managing WDK strategies.
Provides methods to create steps, compose step trees, build strategies, run reports, manage filters, execute analyses, and fetch records. Follows the WDK REST pattern: create unattached steps, then POST a strategy with a stepTree linking them.
Inherits from
StepsMixin,StrategiesMixin,ReportsMixin,AnalysisMixin,FilterMixin,RecordsMixin, andStrategyAPIBase(via MRO).
HTTP Client¶
Purpose: Low-level HTTP client for VEuPathDB WDK endpoints. Handles auth, request formatting, response parsing, and retries.
Key class: VEuPathDBClient — methods: get, post, put,
patch, delete; search details and report execution helpers.
HTTP client for VEuPathDB WDK REST API with retries and cookies.
- veupath_chatbot.integrations.veupathdb.client.encode_context_param_values_for_wdk(context)[source]¶
Encode contextParamValues in the format WDK expects.
Many WDK endpoints expect multi-pick values as JSON-encoded strings (e.g. ‘[“a”,”b”]’), not arrays.
- Parameters:
context (JSONObject) – Context dict.
- Returns:
Encoded context suitable for WDK wire format.
- Return type:
- class veupath_chatbot.integrations.veupathdb.client.VEuPathDBClient(base_url, timeout=30.0, auth_token=None, *, max_connections=1000, max_keepalive_connections=200)[source]¶
Bases:
objectHTTP client for VEuPathDB WDK REST services.
- __init__(base_url, timeout=30.0, auth_token=None, *, max_connections=1000, max_keepalive_connections=200)[source]¶
- async get_search_details(record_type, search_name, expand_params=True)[source]¶
Get detailed search configuration including parameters.
- Return type:
- async get_search_details_with_params(record_type, search_name, context, expand_params=True)[source]¶
Get detailed search configuration using provided parameters.
- Return type:
- async get_refreshed_dependent_params(record_type, search_name, param_name, context)[source]¶
Refresh dependent params using WDK’s refreshed-dependent-params endpoint.
- Return type:
- async run_search_report(record_type, search_name, search_config, report_config=None)[source]¶
Run a report on a search without creating a step or strategy.
Uses WDK’s anonymous report endpoint:
POST /record-types/{recordType}/searches/{searchName}/reports/standardThis is significantly faster than creating steps/strategies because it requires no user session and can be parallelized.
- Parameters:
record_type (str) – WDK record type (e.g.
"transcript").search_name (str) – WDK search name (e.g.
"GenesByTaxon").search_config (JSONObject) – Search config with
parametersdict.report_config (JSONObject | None) – Report config (pagination, attributes, etc.).
- Returns:
Standard report response with
meta.totalCount.- Return type:
- async get_step_view_filters(user_id, step_id)[source]¶
Get viewFilters from a step’s answerSpec.
WDK stores filters as
answerSpec.viewFilterson the step resource. There is no dedicated/filterendpoint.- Return type:
- async update_step_view_filters(user_id, step_id, filters)[source]¶
Update a step’s viewFilters via PATCH on the step resource.
WDK manages filters through
answerSpec.viewFilters. The PATCH body is{"answerSpec": {"viewFilters": [...]}}.- Return type:
- async list_analysis_types(user_id, step_id)[source]¶
List available analysis types for a step.
- Return type:
- async get_analysis_type(user_id, step_id, analysis_type)[source]¶
Get analysis form metadata for a specific analysis type.
- Return type:
- async list_step_analyses(user_id, step_id)[source]¶
List analyses that have been run on a step.
- Return type:
- async create_step_analysis(user_id, step_id, payload)[source]¶
Create a new analysis instance for a step.
- Return type:
- async run_analysis_instance(user_id, step_id, analysis_id)[source]¶
Kick off execution of a step analysis instance.
WDK step analyses are created first, then explicitly run.
POST /users/{userId}/steps/{stepId}/analyses/{analysisId}/resultreturns{"status": "RUNNING"|...}.- Return type:
- async get_analysis_status(user_id, step_id, analysis_id)[source]¶
Poll execution status of a step analysis instance.
GET .../analyses/{analysisId}/result/statusreturns{"status": "RUNNING"|"COMPLETE"|"ERROR"|...}.- Return type:
Discovery¶
Purpose: Cached catalog of record types, searches, and parameter specs per site. Used by tools to discover available questions and parameters without repeated WDK calls.
Key class: SearchCatalog — load, get_record_types,
get_searches, get_search_details
Discovery and caching of record types, searches, and parameters.
- class veupath_chatbot.integrations.veupathdb.discovery.SearchCatalog(site_id)[source]¶
Bases:
objectCached catalog of searches for a site.
- find_search(record_type, search_name)[source]¶
Find a specific search.
- Parameters:
- Return type:
JSONObject | None
- find_record_type_for_search(search_name)[source]¶
Find which record type owns a search (global lookup).
Mirrors WDK’s
WdkModel.getQuestionByName()— iterates all cached record types to find the one containing the given search.
- class veupath_chatbot.integrations.veupathdb.discovery.DiscoveryService[source]¶
Bases:
objectService for discovering and caching site metadata.
Factory¶
Purpose: Factory for obtaining configured VEuPathDB clients and discovery services. Manages site routing and client lifecycle.
Key function: get_wdk_client()
Note
get_discovery_service() is defined in integrations.veupathdb.discovery,
not in the factory module.
Integration entrypoints for WDK clients and services.
- class veupath_chatbot.integrations.veupathdb.factory.DiscoveryService[source]¶
Bases:
objectService for discovering and caching site metadata.
- class veupath_chatbot.integrations.veupathdb.factory.SiteInfo(id, name, display_name, base_url, project_id, is_portal)[source]¶
Bases:
objectVEuPathDB site information.
- classmethod from_config(site_id, cfg)[source]¶
Construct a SiteInfo from a validated SiteConfig.
- Return type:
- async veupath_chatbot.integrations.veupathdb.factory.close_all_clients()[source]¶
Close all cached WDK clients.
- veupath_chatbot.integrations.veupathdb.factory.get_discovery_service()[source]¶
Get the global discovery service.
- Return type:
- veupath_chatbot.integrations.veupathdb.factory.get_results_api(site_id)[source]¶
Get a temporary results API wrapper for a site.
- Parameters:
site_id (str) – VEuPathDB site identifier.
- Return type:
- veupath_chatbot.integrations.veupathdb.factory.get_strategy_api(site_id)[source]¶
Get a Strategy API wrapper for a site.
- Parameters:
site_id (str) – VEuPathDB site identifier.
- Return type:
Strategy API — Submodules¶
The Strategy API is split into focused submodules for steps, strategies, reports, and shared helpers.
StrategyAPI base class with shared infrastructure.
Provides initialization, parameter normalization, and session management that all mixin classes depend on.
- class veupath_chatbot.integrations.veupathdb.strategy_api.base.StrategyAPIBase(client, user_id='current')[source]¶
Bases:
objectBase infrastructure for
StrategyAPI.Provides
__init__, parameter normalization, and WDK session management. Mixin classes inherit from this to access shared state.- __init__(client, user_id='current')[source]¶
Initialize the strategy API.
- Parameters:
client (VEuPathDBClient) – VEuPathDB HTTP client (site-specific).
user_id (str) – WDK user ID; defaults to
"current"(resolved at first use).
Step creation methods for the Strategy API.
Provides StepsMixin with methods to create search steps,
combined (boolean) steps, transform steps, and datasets.
- class veupath_chatbot.integrations.veupathdb.strategy_api.steps.StepsMixin(client, user_id='current')[source]¶
Bases:
StrategyAPIBaseMixin providing step creation and dataset upload methods.
- async create_dataset(ids)[source]¶
Upload an ID list as a WDK dataset and return the dataset ID.
WDK DatasetParam parameters (type
input-dataset) expect an integer dataset ID, not raw IDs. This method creates a transient dataset viaPOST /users/{userId}/datasetsand returns the integer ID that can be used as the parameter value.- Parameters:
ids (list[str]) – List of record IDs (e.g. gene locus tags).
- Returns:
Integer dataset ID.
- Raises:
InternalError – If dataset creation fails or no ID is returned.
- Return type:
- async create_step(record_type, search_name, parameters, custom_name=None, wdk_weight=None)[source]¶
Create an unattached step.
- Parameters:
record_type (str) – Record type (e.g., “gene”, “transcript”).
search_name (str) – Name of the search question.
parameters (JSONObject) – Search parameters.
custom_name (str | None) – Optional custom name for the step.
wdk_weight (int | None) – Optional WDK weight for result ranking in combined strategies.
- Returns:
Created step data with stepId.
- Return type:
- async create_combined_step(primary_step_id, secondary_step_id, boolean_operator, record_type, custom_name=None, wdk_weight=None)[source]¶
Create a combined step (boolean operation).
- Parameters:
primary_step_id (int) – ID of the primary (left) step.
secondary_step_id (int) – ID of the secondary (right) step.
boolean_operator (str) – One of INTERSECT, UNION, MINUS, RMINUS, LONLY, RONLY.
record_type (str) – WDK record type.
custom_name (str | None) – Optional custom name.
wdk_weight (int | None) – Optional WDK weight for result ranking in combined strategies.
- Returns:
Created step data.
- Return type:
- async create_transform_step(input_step_id, transform_name, parameters, record_type='transcript', custom_name=None, wdk_weight=None)[source]¶
Create a transform step.
WDK requires that
input-step(AnswerParam) parameters are set to the empty string""when creating new steps — the actual input wiring happens via thestepTreeat strategy creation time.This method fetches the search metadata to discover AnswerParam names, strips any stale values, and forces them to
"".- Parameters:
input_step_id (int) – ID of the input step (for logging; wiring happens in the strategy
stepTree).transform_name (str) – Name of the transform question.
parameters (JSONObject) – Transform parameters.
record_type (str) – WDK record type for the search details lookup.
custom_name (str | None) – Optional custom name.
wdk_weight (int | None) – Optional WDK weight for result ranking in combined strategies.
- Returns:
Created step data.
- Return type:
Strategy CRUD methods for the Strategy API.
Provides StrategiesMixin with methods to create, read, update,
and delete WDK strategies.
- class veupath_chatbot.integrations.veupathdb.strategy_api.strategies.StrategiesMixin(client, user_id='current')[source]¶
Bases:
StrategyAPIBaseMixin providing strategy CRUD methods.
- async create_strategy(step_tree, name, description=None, is_public=False, is_saved=False, is_internal=False)[source]¶
Create a strategy from a step tree.
- Parameters:
- Returns:
Created strategy data.
- Return type:
- async update_strategy(strategy_id, step_tree=None, name=None)[source]¶
Update a strategy.
- Return type:
Core report and answer methods for the Strategy API.
Provides ReportsMixin with methods to run reports, fetch step
answers and records, and get step counts.
- class veupath_chatbot.integrations.veupathdb.strategy_api.reports.ReportsMixin(client, user_id='current')[source]¶
Bases:
StrategyAPIBaseMixin providing report, answer, and step count methods.
- async run_step_report(step_id, report_name, config=None)[source]¶
Run a report on a step.
- Return type:
- async get_step_answer(step_id, attributes=None, pagination=None)[source]¶
Get answer records for a step via the standard report endpoint.
Convenience wrapper around
get_step_records().
- async get_step_records(step_id, attributes=None, tables=None, pagination=None, sorting=None)[source]¶
Get paginated records for a step with configurable attributes and sorting.
Module-level helpers for WDK strategy operations.
Internal strategy name tagging utilities, shared constants, and
re-export of StepTreeNode (canonical definition lives in
domain.strategy.ast).
- class veupath_chatbot.integrations.veupathdb.strategy_api.helpers.StepTreeNode(step_id, primary_input=None, secondary_input=None)[source]¶
Bases:
objectNode in a WDK step tree.
Represents a single step with optional primary (and for combines, secondary) input references. Used to build the
stepTreepayload for WDK strategy creation. Pure data structure with no I/O.
- veupath_chatbot.integrations.veupathdb.strategy_api.helpers.is_internal_wdk_strategy_name(name)[source]¶
Check if a WDK strategy name is a Pathfinder internal helper strategy.
Internal strategies are used for control tests and step counts. They are tagged with
__pathfinder_internal__:prefix.
- async veupath_chatbot.integrations.veupathdb.strategy_api.helpers.resolve_wdk_user_id(client)[source]¶
Resolve the concrete WDK user ID from a
/users/currentcall.Some WDK deployments reject mutations on
/users/current/.... This helper resolves the actual numeric user ID once so callers can use/users/{userId}/...for all subsequent requests.- Returns:
Resolved user ID string, or
Noneif resolution failed.- Return type:
str | None
- veupath_chatbot.integrations.veupathdb.strategy_api.helpers.strip_internal_wdk_strategy_name(name)[source]¶
Remove the internal strategy name prefix if present.
- veupath_chatbot.integrations.veupathdb.strategy_api.helpers.tag_internal_wdk_strategy_name(name)[source]¶
Add the internal strategy name prefix if not already present.
- Return type:
Step analysis lifecycle for the Strategy API.
Provides AnalysisMixin with methods to list analysis types,
create, run, poll, and retrieve step analysis results.
- class veupath_chatbot.integrations.veupathdb.strategy_api.analyses.AnalysisMixin(client, user_id='current')[source]¶
Bases:
StrategyAPIBaseMixin providing step analysis lifecycle methods.
- async get_analysis_type(step_id, analysis_type)[source]¶
Get analysis form metadata for a step.
- Return type:
- async run_step_analysis(step_id, analysis_type, parameters=None, custom_name=None, poll_interval=2.0, max_wait=300.0, max_retries=3)[source]¶
Create, run, and wait for a WDK step analysis to complete.
WDK step analysis is a multi-phase process:
POST .../analyses– create instance (returnsanalysisId)POST .../analyses/{id}/result– kick off executionGET .../analyses/{id}/result/status– poll until COMPLETEGET .../analyses/{id}/result– retrieve results
Boolean/combined steps may return
ERRORon the first run because sub-step answers haven’t been computed yet. Per the WDK source (ExecutionStatus.requiresRerun), the correct strategy is to re-run the same instance – the WDK backend resets toPENDINGand re-executes.- Parameters:
step_id (int) – WDK step ID (must be part of a strategy).
analysis_type (str) – Analysis plugin name (e.g.
go-enrichment).parameters (JSONObject | None) – Analysis parameters.
custom_name (str | None) – Optional display name.
poll_interval (float) – Seconds between status polls.
max_wait (float) – Maximum seconds to wait before giving up.
max_retries (int) – Maximum re-run attempts for retriable statuses.
- Returns:
Analysis result JSON.
- Raises:
InternalError – If the analysis fails or times out.
- Return type:
Step filter CRUD operations for the Strategy API.
Provides FilterMixin with methods to create, delete, and list
step filters via WDK’s answerSpec.viewFilters mechanism.
WDK does NOT have dedicated filter endpoints (/filter, /filter/{name}).
Filters are managed by reading/patching the step’s answerSpec.viewFilters
array through the step resource itself.
- class veupath_chatbot.integrations.veupathdb.strategy_api.filters.FilterMixin(client, user_id='current')[source]¶
Bases:
StrategyAPIBaseMixin providing step filter CRUD via answerSpec.viewFilters.
- async list_step_filters(step_id)[source]¶
List viewFilters for a step.
Reads the step resource and extracts
answerSpec.viewFilters.- Return type:
Record type info, single record, and column distribution methods.
Provides RecordsMixin with methods for record type metadata,
individual record retrieval, and column value distributions.
- class veupath_chatbot.integrations.veupathdb.strategy_api.records.RecordsMixin(client, user_id='current')[source]¶
Bases:
StrategyAPIBaseMixin providing record type info, single record, and distribution methods.
- async get_record_type_info(record_type)[source]¶
Get expanded record type info including attributes and tables.
- Parameters:
record_type (str) – WDK record type (e.g. “gene”).
- Returns:
Record type metadata with attribute fields.
- Return type:
- async get_single_record(record_type, primary_key, attributes=None, tables=None)[source]¶
Fetch a single record by its primary key.
WDK’s
POST /record-types/{type}/recordsrequiresprimaryKey,attributes, andtablesarrays in the request body. Whenattributesortablesare not provided we send empty arrays which tells WDK to return the default set.- Parameters:
- Returns:
Full record with requested attributes/tables.
- Return type:
- async get_column_distribution(step_id, column_name)[source]¶
Get distribution data for a column using the byValue column reporter.
Uses
POST .../columns/{col}/reports/byValuewhich returns ahistogramarray andstatisticsobject. This replaces the deprecatedfilter-summaryendpoint.Not all columns support the byValue reporter (e.g. overview or composite columns). When WDK returns an error, an empty result is returned so the frontend can show a friendly message.
- Parameters:
- Returns:
{histogram: [...], statistics: {...}}- Return type:
Parameter Utils¶
Purpose: Parameter parsing and formatting utilities for WDK parameter specs.
Shared WDK parameter value normalization and JSON shape helpers.
WDK expects all parameter values as strings. Multi-pick enum params use a
JSON array string (see AbstractEnumParam.getExternalStableValue in the
WDK source). Single-pick params use a plain string value.
Also provides small helpers for extracting canonical names from WDK entity
dicts (record types, searches) so callers don’t repeat the
urlSegment || name pattern.
- veupath_chatbot.integrations.veupathdb.param_utils.normalize_param_value(value)[source]¶
Convert a JSON parameter value to the string format WDK expects.
- veupath_chatbot.integrations.veupathdb.param_utils.wdk_entity_name(obj)[source]¶
Extract the canonical name from a WDK entity dict.
WDK record-type and search objects expose
urlSegment(preferred) andnameas identifiers. This helper returns whichever is available, preferringurlSegment.- Parameters:
obj (JSONObject | JSONValue) – WDK entity dict (record type or search).
- Returns:
Canonical name string, or “” if neither field exists.
- Return type:
Site Router¶
Purpose: Route requests to the correct VEuPathDB site by site ID. Manages site URL mappings.
Site routing: choose portal vs component sites intelligently.
- class veupath_chatbot.integrations.veupathdb.site_router.SiteConfig(*, name='', display_name='', base_url='', project_id='', is_portal=False)[source]¶
Bases:
BaseModelValidated configuration for a single VEuPathDB site.
- model_config = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class veupath_chatbot.integrations.veupathdb.site_router.RoutingConfig(*, portal_timeout=120.0, component_timeout=30.0)[source]¶
Bases:
BaseModelValidated routing/timeout configuration.
- model_config = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class veupath_chatbot.integrations.veupathdb.site_router.SitesConfig(*, sites=<factory>, default_site='veupathdb', routing=<factory>)[source]¶
Bases:
BaseModelTop-level sites configuration parsed from YAML.
- sites: dict[str, SiteConfig]¶
- routing: RoutingConfig¶
- model_config = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- veupath_chatbot.integrations.veupathdb.site_router.load_sites_config(config_path=None)[source]¶
Load and validate sites configuration from YAML.
- Parameters:
config_path (str | None) – Optional path to a YAML file. If unset or empty, uses the bundled
sites.yamlnext to this module.- Returns:
Validated SitesConfig model.
- Return type:
- class veupath_chatbot.integrations.veupathdb.site_router.SiteInfo(id, name, display_name, base_url, project_id, is_portal)[source]¶
Bases:
objectVEuPathDB site information.
- classmethod from_config(site_id, cfg)[source]¶
Construct a SiteInfo from a validated SiteConfig.
- Return type:
- class veupath_chatbot.integrations.veupathdb.site_router.SiteRouter[source]¶
Bases:
objectRouter for choosing appropriate VEuPathDB site.
- get_client(site_id)[source]¶
Get or create HTTP client for a site.
- Parameters:
site_id (str) – VEuPathDB site identifier.
- Return type:
Site Search¶
Purpose: Site-level search across VEuPathDB. Used for gene lookup and catalog discovery.
Integration for VEuPathDB “site search” service.
This is the same backend used by the web UI route /app/search.
Important: this service is hosted at the site origin root (e.g. https://plasmodb.org/site-search), not under the WDK service prefix (e.g. https://plasmodb.org/plasmo/service).
The site-search endpoint is GET-only – POST returns HTTP 500. Parameters are passed as query-string key-value pairs:
searchText– search querydocType– restrict to a document type (e.g. “gene”, “search”)offset/numRecords– paginationrestrictToProject– site project id filterrestrictSearchToOrganisms– comma-separated organism names
- async veupath_chatbot.integrations.veupathdb.site_search.close_site_search_client()[source]¶
Close the shared site-search HTTP client (call during app shutdown).
- async veupath_chatbot.integrations.veupathdb.site_search.query_site_search(site_id, *, search_text, document_type=None, organisms=None, limit=20, offset=0)[source]¶
Query the site’s /site-search endpoint and return parsed JSON.
The VEuPathDB site-search service only accepts GET requests with query-string parameters. POST returns HTTP 500.
Temporary Results¶
Purpose: Create and manage temporary WDK results for step reports, downloads, and analysis. Handles user session resolution.
VEuPathDB /temporary-results helpers for downloads.
- class veupath_chatbot.integrations.veupathdb.temporary_results.TemporaryResultsAPI(client, user_id='current')[source]¶
Bases:
StrategyAPIBaseAPI for creating and managing temporary result downloads.
Inherits session management (
client,user_id,_ensure_session) fromStrategyAPIBase.- async create_temporary_result(step_id, reporter='standard', format_config=None)[source]¶
Create a temporary result for download.
- Parameters:
step_id (int) – Step ID to export.
reporter (str) – WDK reporter name (standard, fullRecord, etc.).
format_config (JSONObject | None) – Reporter-specific configuration.
- Returns:
Temporary result info with ID and download URL.
- Return type:
Vectorstore — Ingest Pipeline¶
Generic concurrent pipeline utility for ingest workers.
Provides a reusable asyncio-based producer/worker/consumer pattern with:
- A pool of concurrent workers that process items via process_fn
- A single consumer that batches results and flushes via flush_fn
- Sentinel-based shutdown with proper queue joining
- async veupath_chatbot.integrations.vectorstore.ingest.pipeline.run_concurrent_pipeline(*, items, process_fn, flush_fn, concurrency, batch_size, on_error=None)[source]¶
Run items through a concurrent worker pool, batching results to flush_fn.
Parameters¶
- items:
The input items to process.
- process_fn:
Async callable that transforms a single input item into an output. Return
Noneto skip the item (it will not be flushed).- flush_fn:
Async callable that receives a batch of outputs to persist/upsert.
- concurrency:
Maximum number of concurrent worker tasks.
- batch_size:
Number of outputs to accumulate before calling flush_fn.
- on_error:
Optional sync callback invoked when process_fn raises. If not provided, errors are silently swallowed and the item is skipped.
- async veupath_chatbot.integrations.vectorstore.ingest.wdk_fetch.fetch_record_types_and_searches(client)[source]¶
- async veupath_chatbot.integrations.vectorstore.ingest.wdk_fetch.fetch_search_details(client, rt_name, search_name, summary_unwrapped)[source]¶
- Return type:
tuple[JSONObject, str | None]
- async veupath_chatbot.integrations.vectorstore.ingest.wdk_index.filter_existing_record_types(qdrant_client, record_type_docs, site_id)[source]¶
- Return type:
- async veupath_chatbot.integrations.vectorstore.ingest.wdk_index.filter_existing_searches(qdrant_client, searches_to_fetch, site_id)[source]¶
- Return type:
list[tuple[str, JSONObject]]
- async veupath_chatbot.integrations.vectorstore.ingest.wdk_index.upsert_record_type_docs(store, embedder, record_type_docs)[source]¶
- async veupath_chatbot.integrations.vectorstore.ingest.wdk_index.upsert_search_docs_batch(store, embedder, buffered)[source]¶
- async veupath_chatbot.integrations.vectorstore.ingest.wdk_index.run_search_indexing_pipeline(*, searches_to_fetch, make_doc, store, embedder, concurrency, batch_size, site_id)[source]¶
- veupath_chatbot.integrations.vectorstore.ingest.wdk_transform.build_record_type_doc(site_id, rt)[source]¶
- Return type:
JSONObject | None
- veupath_chatbot.integrations.vectorstore.ingest.wdk_transform.build_search_doc(site_id, rt_name, s, details_unwrapped, details_error, base_url)[source]¶
- Return type:
JSONObject | None