Source code for veupath_chatbot.services.research.web_search

"""Web search service using DuckDuckGo."""

import asyncio
import re
from typing import cast

import httpx

from veupath_chatbot.domain.research.citations import ensure_unique_citation_tags
from veupath_chatbot.platform.types import JSONArray, JSONObject, JSONValue
from veupath_chatbot.services.research.clients._base import make_citation
from veupath_chatbot.services.research.utils import (
    BROWSER_USER_AGENT,
    candidate_queries,
    decode_ddg_redirect,
    fetch_page_summary,
    looks_blocked,
    strip_tags,
)


[docs] class WebSearchService: """Service for web search using DuckDuckGo HTML interface."""
[docs] def __init__(self, *, timeout_seconds: float = 15.0) -> None: self._timeout = timeout_seconds
[docs] async def search( self, query: str, limit: int = 5, *, include_summary: bool = False, summary_max_chars: int = 600, ) -> JSONObject: """Search the web and return results with citations.""" q = (query or "").strip() if not q: return {"results": [], "citations": [], "error": "query_required"} limit = max(1, min(int(limit or 5), 10)) summary_max_chars = max(200, min(int(summary_max_chars or 600), 4000)) results, effective_query, diag = await self._ddg_html_search(q, limit=limit) if include_summary and results: dict_results = [r for r in results if isinstance(r, dict)] async with httpx.AsyncClient( timeout=min(self._timeout, 15.0), headers={ "User-Agent": BROWSER_USER_AGENT, "Accept": ( "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" ), "Accept-Language": "en-US,en;q=0.9", }, ) as client: summaries = await asyncio.gather( *[ fetch_page_summary( client, r.get("url"), max_chars=summary_max_chars, ) for r in dict_results ], return_exceptions=True, ) for r, s in zip(dict_results, summaries, strict=True): summary = s.strip() if isinstance(s, str) and s.strip() else None r["summary"] = cast(JSONValue, summary) snip = r.get("snippet") if ((not isinstance(snip, str)) or len(snip.strip()) < 40) and summary: r["snippet"] = cast(JSONValue, summary) citations: list[JSONObject] = [] for item in results: if not isinstance(item, dict): continue title_raw = item.get("title") url_raw = item.get("url") title = ( title_raw if isinstance(title_raw, str) else (url_raw if isinstance(url_raw, str) else "Web result") ) snippet_raw = item.get("summary") or item.get("snippet") snippet = snippet_raw if isinstance(snippet_raw, str) else None citations.append( make_citation( source="web", id_prefix="web", title=title, url=url_raw if isinstance(url_raw, str) else None, snippet=snippet, ) ) ensure_unique_citation_tags(citations) payload: JSONObject = { "query": q, "effectiveQuery": effective_query, "searchAdjusted": effective_query != q, "searchDiagnostics": diag, "results": results, "citations": cast(JSONValue, citations), } if not results and isinstance(diag, dict) and diag.get("blocked") is True: payload["error"] = "search_blocked" return payload
async def _ddg_html_search( self, q: str, *, limit: int ) -> tuple[JSONArray, str, JSONObject]: """Perform DuckDuckGo HTML search with fallback query variations.""" url = "https://html.duckduckgo.com/html/" headers = { "User-Agent": BROWSER_USER_AGENT, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", } def _parse_results(html: str) -> JSONArray: parsed: JSONArray = [] # Find result links; snippets are nearby in the HTML. for m in re.finditer( r'class="result__a"[^>]*href="([^"]+)"[^>]*>(.*?)</a>', html, flags=re.IGNORECASE, ): if len(parsed) >= limit: break href = m.group(1) title = strip_tags(m.group(2)) if not title: continue window = html[m.end() : m.end() + 2000] m_snip = re.search( r'class="result__snippet"[^>]*>(.*?)</', window, flags=re.IGNORECASE, ) snippet_html = m_snip.group(1) if m_snip else "" snippet = strip_tags(snippet_html) or None parsed.append( { "title": title, "url": decode_ddg_redirect(href), "snippet": snippet, } ) return parsed diag: JSONObject = { "blocked": False, "attempts": 0, "statusCodes": cast(JSONValue, []), } last_html = "" async with httpx.AsyncClient(timeout=self._timeout, headers=headers) as client: for cand in candidate_queries(q): resp = await client.get(url, params={"q": cand}, follow_redirects=True) attempts_raw = diag.get("attempts") attempts = ( int(attempts_raw) if isinstance(attempts_raw, (int, float)) else 0 ) diag["attempts"] = attempts + 1 status_codes_raw = diag.get("statusCodes") if isinstance(status_codes_raw, list): status_codes_raw.append(resp.status_code) else: diag["statusCodes"] = cast(JSONValue, [resp.status_code]) last_html = resp.text or "" if looks_blocked(resp.status_code, last_html): diag["blocked"] = True continue results = _parse_results(last_html) if results: return results, cand, diag if last_html and not diag.get("blocked"): return _parse_results(last_html), q, diag return [], q, diag