Source code for veupath_chatbot.services.strategies.auto_push
"""Best-effort auto-push: sync a local strategy back to VEuPathDB WDK.
Called after strategy mutations (CRUD updates, chat-driven changes) when
the strategy has a ``wdk_strategy_id``. Failures are logged but never
propagate — the local save is the source of truth.
IMPORTANT: this runs as a background ``asyncio.Task``, **not** inside the
request's DB session. It creates its own session to avoid the
``asyncpg InterfaceError: another operation is in progress`` that occurs
when a fire-and-forget task shares a request-scoped connection.
"""
import asyncio
from uuid import UUID
from veupath_chatbot.domain.strategy.compile import compile_strategy
from veupath_chatbot.integrations.veupathdb.factory import get_strategy_api
from veupath_chatbot.persistence.repositories.stream import StreamRepository
from veupath_chatbot.persistence.session import async_session_factory
from veupath_chatbot.platform.errors import WDKError
from veupath_chatbot.platform.logging import get_logger
from veupath_chatbot.services.catalog.searches import make_record_type_resolver
from veupath_chatbot.services.strategies.plan_validation import validate_plan_or_raise
logger = get_logger(__name__)
# Per-strategy lock to prevent concurrent auto-pushes from racing on
# the same DB row / WDK strategy.
_push_locks: dict[UUID, asyncio.Lock] = {}
_PUSH_LOCKS_MAX = 200
def _get_push_lock(strategy_id: UUID) -> asyncio.Lock:
"""Get or create a per-strategy lock for serialising WDK push operations.
:param strategy_id: Strategy UUID.
:returns: Asyncio lock for the given strategy.
"""
if strategy_id not in _push_locks:
# Evict oldest *unlocked* entries to bound memory.
if len(_push_locks) >= _PUSH_LOCKS_MAX:
to_evict: UUID | None = None
for candidate in _push_locks:
if not _push_locks[candidate].locked():
to_evict = candidate
break
if to_evict is not None:
del _push_locks[to_evict]
_push_locks[strategy_id] = asyncio.Lock()
return _push_locks[strategy_id]
[docs]
async def try_auto_push_to_wdk(
strategy_id: UUID,
) -> None:
"""Push a strategy to WDK if it has a ``wdk_strategy_id``.
Reads from the CQRS projection (stream_projections table).
This is a best-effort operation — any error is logged and swallowed.
If the WDK strategy no longer exists (404), the stale
``wdk_strategy_id`` is cleared so future pushes don't keep failing.
"""
lock = _get_push_lock(strategy_id)
if lock.locked():
# Another push for this strategy is already in flight — skip.
logger.debug(
"Auto-push skipped (already in progress)",
strategy_id=str(strategy_id),
)
return
# Use an independent DB session so we don't contend with the
# request-scoped session that fired this background task.
async with lock, async_session_factory() as session:
try:
repo = StreamRepository(session)
projection = await repo.get_projection(strategy_id)
if not projection or not projection.wdk_strategy_id:
return
site_id = projection.site_id
if not site_id:
return
plan = projection.plan if isinstance(projection.plan, dict) else {}
if not plan:
return
strategy_ast = validate_plan_or_raise(plan)
api = get_strategy_api(site_id)
resolver = await make_record_type_resolver(site_id)
result = await compile_strategy(
strategy_ast, api, site_id=site_id, resolve_search_record_type=resolver
)
await api.update_strategy(
strategy_id=projection.wdk_strategy_id,
step_tree=result.step_tree,
name=projection.name,
)
# Rewrite local IDs to WDK IDs in the persisted plan.
compiled_map = {s.local_id: s.wdk_step_id for s in result.steps}
all_steps = strategy_ast.get_all_steps()
unmapped = [s.id for s in all_steps if s.id not in compiled_map]
if unmapped:
logger.warning(
"Auto-push: some steps missing from compiled map, "
"skipping ID rewrite to avoid mixed-ID corruption",
strategy_id=str(strategy_id),
unmapped_step_ids=unmapped,
)
else:
for step in all_steps:
step.id = str(compiled_map[step.id])
updated_plan = strategy_ast.to_dict()
await repo.update_projection(
strategy_id,
plan=updated_plan,
record_type=strategy_ast.record_type,
step_count=len(strategy_ast.get_all_steps()),
)
await session.commit()
logger.info(
"Auto-pushed strategy to WDK",
strategy_id=str(strategy_id),
wdk_strategy_id=projection.wdk_strategy_id,
)
except WDKError as e:
await session.rollback()
if e.status == 404:
# The WDK strategy no longer exists — clear the stale
# reference so we stop retrying on every save.
logger.warning(
"WDK strategy no longer exists, clearing wdk_strategy_id",
strategy_id=str(strategy_id),
)
try:
repo = StreamRepository(session)
await repo.update_projection(
strategy_id,
wdk_strategy_id=None,
wdk_strategy_id_set=True,
)
await session.commit()
except Exception:
await session.rollback()
logger.error(
"Failed to clear stale wdk_strategy_id",
strategy_id=str(strategy_id),
)
else:
logger.warning(
"Auto-push to WDK failed (best-effort)",
strategy_id=str(strategy_id),
error=str(e),
)
except Exception as e:
await session.rollback()
logger.warning(
"Auto-push to WDK failed (best-effort)",
strategy_id=str(strategy_id),
error=str(e),
)