Source code for veupath_chatbot.integrations.vectorstore.ingest.utils
from collections.abc import Sequence
from itertools import batched
from typing import Any, cast
from qdrant_client import AsyncQdrantClient
from veupath_chatbot.integrations.embeddings.openai_embeddings import OpenAIEmbeddings
from veupath_chatbot.integrations.vectorstore.qdrant_store import QdrantStore
from veupath_chatbot.platform.logging import get_logger
from veupath_chatbot.platform.types import JSONObject, JSONValue
logger = get_logger(__name__)
[docs]
def parse_sites(value: str) -> list[str] | None:
"""Parse a comma-separated list of site IDs or 'all' → None (meaning all)."""
v = (value or "").strip()
if not v or v == "all":
return None
return [s.strip() for s in v.split(",") if s.strip()]
[docs]
async def existing_point_ids(
*,
qdrant_client: AsyncQdrantClient,
collection: str,
ids: list[str],
chunk_size: int = 256,
) -> set[str]:
"""Return subset of ids that already exist in Qdrant.
If the collection doesn't exist yet, returns an empty set.
"""
if not ids:
return set()
existing: set[str] = set()
for chunk in batched(ids, max(1, int(chunk_size)), strict=False):
try:
points = await qdrant_client.retrieve(
collection_name=collection,
ids=chunk,
with_payload=False,
with_vectors=False,
)
except Exception as exc:
# Missing collection is a normal state pre-ingestion.
msg = str(exc)
if "doesn't exist" in msg or "Not found: Collection" in msg:
return set()
logger.warning(
"Qdrant retrieve failed during existence check",
collection=collection,
error=msg,
errorType=type(exc).__name__,
)
raise
for p in points or []:
existing.add(str(p.id))
return existing
[docs]
async def embed_and_upsert(
*,
store: QdrantStore,
embedder: OpenAIEmbeddings,
collection: str,
ids: Sequence[str | JSONValue],
texts: list[str],
payloads: Sequence[JSONObject],
) -> None:
"""Embed *texts*, pair with *ids*/*payloads*, and upsert to *collection*.
This is the shared core of both WDK and public-strategy indexing pipelines.
"""
if not texts:
return
vectors = await embedder.embed_texts(texts)
await store.upsert(
collection=collection,
points=[
cast(
Any,
{"id": pid, "vector": v, "payload": payload},
)
for pid, v, payload in zip(ids, vectors, payloads, strict=True)
],
)