Source code for veupath_chatbot.integrations.vectorstore.dependent_vocab_cache

import time

from veupath_chatbot.integrations.embeddings.openai_embeddings import embed_one
from veupath_chatbot.integrations.vectorstore.bootstrap import get_embedding_dim
from veupath_chatbot.integrations.vectorstore.collections import (
    WDK_DEPENDENT_VOCAB_CACHE_V1,
)
from veupath_chatbot.integrations.vectorstore.qdrant_store import (
    QdrantStore,
    context_hash,
    point_uuid,
)
from veupath_chatbot.integrations.veupathdb.client import (
    encode_context_param_values_for_wdk,
)
from veupath_chatbot.integrations.veupathdb.factory import get_wdk_client
from veupath_chatbot.platform.config import get_settings
from veupath_chatbot.platform.errors import WDKError
from veupath_chatbot.platform.types import JSONObject, JSONValue


[docs] async def ensure_dependent_vocab_collection(store: QdrantStore) -> None: """Create the dependent vocab cache collection if missing. This collection is used for keyed lookup (site/rt/search/param/contextHash). We still store vectors to keep Qdrant schema consistent and allow optional similarity later. """ s = get_settings() dim = await get_embedding_dim(s.embeddings_model) await store.ensure_collection(name=WDK_DEPENDENT_VOCAB_CACHE_V1, vector_size=dim)
[docs] async def get_dependent_vocab_authoritative_cached( *, site_id: str, record_type: str, search_name: str, param_name: str, context_values: JSONObject, store: QdrantStore | None = None, ) -> JSONObject: """Return authoritative dependent vocab, cached in Qdrant. - Cache key is the *WDK-wire* encoded context values (json-string encoding for lists/dicts). - On cache miss, calls WDK `/refreshed-dependent-params` (via existing client) and stores result. """ store = store or QdrantStore.from_settings() await ensure_dependent_vocab_collection(store) wdk_context = encode_context_param_values_for_wdk(context_values or {}) ch = context_hash(wdk_context) key = f"{site_id}:{record_type}:{search_name}:{param_name}:{ch}" pid = point_uuid(key) cached = await store.get(collection=WDK_DEPENDENT_VOCAB_CACHE_V1, point_id=pid) if cached: payload_value = cached.get("payload") if isinstance(payload_value, dict): payload_dict: JSONObject = {str(k): v for k, v in payload_value.items()} return {"cache": "hit", **payload_dict} client = get_wdk_client(site_id) try: response = await client.get_refreshed_dependent_params( record_type, search_name, param_name, wdk_context ) except WDKError: if site_id != "veupathdb": portal_client = get_wdk_client("veupathdb") response = await portal_client.get_refreshed_dependent_params( record_type, search_name, param_name, wdk_context ) else: raise payload: JSONObject = { "siteId": site_id, "recordType": record_type, "searchName": search_name, "paramName": param_name, "contextParamValues": wdk_context, "contextHash": ch, "wdkResponse": response, "ingestedAt": int(time.time()), "sourceUrl": f"{client.base_url}/record-types/{record_type}/searches/{search_name}/refreshed-dependent-params", } # Minimal vector (not used for correctness; just to satisfy collection vector config) vec = await embed_one( text=f"{site_id} {record_type} {search_name} {param_name}", model=get_settings().embeddings_model, ) # Convert list[float] to list[JSONValue] for type compatibility vec_json: list[JSONValue] = [float(x) for x in vec] await store.upsert( collection=WDK_DEPENDENT_VOCAB_CACHE_V1, points=[{"id": pid, "vector": vec_json, "payload": payload}], ) return {"cache": "miss", **payload}