Source code for veupath_chatbot.integrations.vectorstore.dependent_vocab_cache
import time
from veupath_chatbot.integrations.embeddings.openai_embeddings import embed_one
from veupath_chatbot.integrations.vectorstore.bootstrap import get_embedding_dim
from veupath_chatbot.integrations.vectorstore.collections import (
WDK_DEPENDENT_VOCAB_CACHE_V1,
)
from veupath_chatbot.integrations.vectorstore.qdrant_store import (
QdrantStore,
context_hash,
point_uuid,
)
from veupath_chatbot.integrations.veupathdb.client import (
encode_context_param_values_for_wdk,
)
from veupath_chatbot.integrations.veupathdb.factory import get_wdk_client
from veupath_chatbot.platform.config import get_settings
from veupath_chatbot.platform.errors import WDKError
from veupath_chatbot.platform.types import JSONObject, JSONValue
[docs]
async def ensure_dependent_vocab_collection(store: QdrantStore) -> None:
"""Create the dependent vocab cache collection if missing.
This collection is used for keyed lookup (site/rt/search/param/contextHash). We still
store vectors to keep Qdrant schema consistent and allow optional similarity later.
"""
s = get_settings()
dim = await get_embedding_dim(s.embeddings_model)
await store.ensure_collection(name=WDK_DEPENDENT_VOCAB_CACHE_V1, vector_size=dim)
[docs]
async def get_dependent_vocab_authoritative_cached(
*,
site_id: str,
record_type: str,
search_name: str,
param_name: str,
context_values: JSONObject,
store: QdrantStore | None = None,
) -> JSONObject:
"""Return authoritative dependent vocab, cached in Qdrant.
- Cache key is the *WDK-wire* encoded context values (json-string encoding for lists/dicts).
- On cache miss, calls WDK `/refreshed-dependent-params` (via existing client) and stores result.
"""
store = store or QdrantStore.from_settings()
await ensure_dependent_vocab_collection(store)
wdk_context = encode_context_param_values_for_wdk(context_values or {})
ch = context_hash(wdk_context)
key = f"{site_id}:{record_type}:{search_name}:{param_name}:{ch}"
pid = point_uuid(key)
cached = await store.get(collection=WDK_DEPENDENT_VOCAB_CACHE_V1, point_id=pid)
if cached:
payload_value = cached.get("payload")
if isinstance(payload_value, dict):
payload_dict: JSONObject = {str(k): v for k, v in payload_value.items()}
return {"cache": "hit", **payload_dict}
client = get_wdk_client(site_id)
try:
response = await client.get_refreshed_dependent_params(
record_type, search_name, param_name, wdk_context
)
except WDKError:
if site_id != "veupathdb":
portal_client = get_wdk_client("veupathdb")
response = await portal_client.get_refreshed_dependent_params(
record_type, search_name, param_name, wdk_context
)
else:
raise
payload: JSONObject = {
"siteId": site_id,
"recordType": record_type,
"searchName": search_name,
"paramName": param_name,
"contextParamValues": wdk_context,
"contextHash": ch,
"wdkResponse": response,
"ingestedAt": int(time.time()),
"sourceUrl": f"{client.base_url}/record-types/{record_type}/searches/{search_name}/refreshed-dependent-params",
}
# Minimal vector (not used for correctness; just to satisfy collection vector config)
vec = await embed_one(
text=f"{site_id} {record_type} {search_name} {param_name}",
model=get_settings().embeddings_model,
)
# Convert list[float] to list[JSONValue] for type compatibility
vec_json: list[JSONValue] = [float(x) for x in vec]
await store.upsert(
collection=WDK_DEPENDENT_VOCAB_CACHE_V1,
points=[{"id": pid, "vector": vec_json, "payload": payload}],
)
return {"cache": "miss", **payload}