"""Utility functions for research services."""
import html
import re
from difflib import SequenceMatcher
from urllib.parse import parse_qs, unquote, urlparse
import httpx
from veupath_chatbot.platform.logging import get_logger
from veupath_chatbot.platform.types import JSONObject, JSONValue
logger = get_logger(__name__)
BROWSER_USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/122.0.0.0 Safari/537.36"
)
[docs]
def norm_text(value: str | None) -> str:
"""Normalize text for comparison.
:param value: Text to normalize.
:returns: Normalized string.
"""
return (value or "").strip().lower()
[docs]
def list_str(value: JSONValue) -> list[str]:
"""Convert a JSON value to a list of strings.
:param value: Value to process.
"""
if isinstance(value, list):
return [str(v) for v in value if v is not None]
return []
[docs]
def limit_authors(authors: list[str] | None, max_authors: int) -> list[str] | None:
"""Limit the number of authors, appending 'et al.' if truncated.
:param authors: Author list.
:param max_authors: Maximum number of authors (-1 for no limit).
:returns: Truncated list or None.
"""
if not isinstance(authors, list) or not authors:
return None
cleaned = [str(a) for a in authors if a is not None and str(a).strip()]
if not cleaned:
return None
if max_authors == -1:
return cleaned
n = int(max_authors)
if n <= 0:
return ["et al."]
if len(cleaned) <= n:
return cleaned
return cleaned[:n] + ["et al."]
[docs]
def truncate_text(text: str | None, max_chars: int) -> str | None:
"""Truncate text to max_chars, appending ellipsis if truncated.
:param text: Text to truncate.
:param max_chars: Maximum character count.
:returns: Truncated string or None.
"""
if not isinstance(text, str):
return None
t = text.strip()
if not t:
return None
if len(t) <= max_chars:
return t
return t[: max_chars - 1].rstrip() + "…"
[docs]
def decode_ddg_redirect(href: str) -> str:
"""Decode DuckDuckGo redirect URLs.
:param href: Redirect URL.
:returns: Decoded URL.
"""
h = (href or "").strip()
if not h:
return h
if h.startswith("//"):
h = "https:" + h
try:
u = urlparse(h)
if "duckduckgo.com" in (u.netloc or "") and u.path.startswith("/l/"):
qs = parse_qs(u.query or "")
uddg = qs.get("uddg", [None])[0]
if isinstance(uddg, str) and uddg:
return unquote(uddg)
except Exception as exc:
logger.debug("Failed to decode DDG redirect URL", error=str(exc))
return h
return h
_LOW_VALUE_QUERY_TOKENS = {
"biography",
"bio",
"wikipedia",
"parasitologist",
"profile",
"about",
"department",
"university",
}
[docs]
def candidate_queries(q: str) -> list[str]:
"""Generate candidate query variations for fallback searches.
:param q: Search query.
:returns: Candidate query variations.
"""
raw = (q or "").strip()
if not raw:
return []
words = [w for w in raw.split() if w.strip()]
cands: list[str] = []
def _add(x: str) -> None:
x = (x or "").strip()
if x and x not in cands:
cands.append(x)
_add(raw)
if len(words) > 2:
_add(" ".join(words[:-1]))
filtered = [w for w in words if w.lower() not in _LOW_VALUE_QUERY_TOKENS]
if len(filtered) >= 2:
_add(" ".join(filtered))
if len(words) >= 2:
_add(" ".join(words[:2]))
return cands
[docs]
def looks_blocked(status_code: int, html: str) -> bool:
"""Check if a response looks like it was blocked by rate limiting.
:param status_code: HTTP status code.
:param html: Response HTML body.
:returns: True if response looks blocked.
"""
if status_code == 202:
return True
h = (html or "").lower()
if "challenge" in h and "result__a" not in h:
return True
return bool("unusual traffic" in h and "result__a" not in h)
[docs]
def norm_for_match(text: str | None) -> str:
"""Normalize text for fuzzy matching.
:param text: Text to normalize.
:returns: Normalized string for matching.
"""
if not isinstance(text, str):
return ""
t = text.lower()
t = re.sub(r"\s+", " ", t).strip()
return t
[docs]
def fallback_ratio(a: str, b: str) -> float:
"""Fallback similarity ratio using SequenceMatcher.
:param a: First string.
:param b: Second string.
:returns: Similarity ratio (0-100).
"""
if not a or not b:
return 0.0
return SequenceMatcher(None, a, b).ratio() * 100.0
[docs]
def fuzzy_score(query: str, text: str) -> float:
"""Calculate fuzzy similarity score between query and text.
:param query: Search query.
:param text: Text to score.
:returns: Fuzzy similarity score.
"""
q = norm_for_match(query)
t = norm_for_match(text)
if not q or not t:
return 0.0
try:
from rapidfuzz import fuzz
return float(fuzz.token_set_ratio(q, t))
except Exception as exc:
logger.debug("rapidfuzz unavailable, using fallback ratio", error=str(exc))
return fallback_ratio(q, t)
[docs]
def rerank_score(query: str, item: JSONObject) -> tuple[float, dict[str, float]]:
"""Calculate reranking score for a literature search result.
:param query: Search query.
:param item: Literature result item.
:returns: Tuple of (score, score breakdown).
"""
title = str(item.get("title") or "")
abstract = str(item.get("abstract") or item.get("snippet") or "")
journal = str(item.get("journalTitle") or item.get("journal") or "")
title_s = fuzzy_score(query, title)
abs_s = fuzzy_score(query, abstract)
journal_s = fuzzy_score(query, journal) if journal else 0.0
score = 0.70 * title_s + 0.28 * abs_s + 0.02 * journal_s
return score, {"title": title_s, "abstract": abs_s, "journal": journal_s}
[docs]
def passes_filters(
*,
title: str,
authors: list[str] | None,
year: int | None,
doi: str | None,
pmid: str | None,
journal: str | None,
year_from: int | None,
year_to: int | None,
author_includes: str | None,
title_includes: str | None,
journal_includes: str | None,
doi_equals: str | None,
pmid_equals: str | None,
require_doi: bool,
) -> bool:
"""Check if a literature result passes all filters.
:param title: Result title.
:param authors: Author list.
:param year: Publication year.
:param doi: DOI.
:param pmid: PubMed ID.
:param journal: Journal name.
:param year_from: Minimum year filter.
:param year_to: Maximum year filter.
:param author_includes: Author substring filter.
:param title_includes: Title substring filter.
:param journal_includes: Journal substring filter.
:param doi_equals: Exact DOI filter.
:param pmid_equals: Exact PMID filter.
:param require_doi: Whether DOI is required.
:returns: True if result passes all filters.
"""
if year_from is not None and (year is None or year < year_from):
return False
if year_to is not None and (year is None or year > year_to):
return False
if require_doi and not (isinstance(doi, str) and doi.strip()):
return False
if doi_equals is not None and norm_text(doi) != norm_text(doi_equals):
return False
if pmid_equals is not None and norm_text(pmid) != norm_text(pmid_equals):
return False
if title_includes is not None and norm_text(title_includes) not in norm_text(title):
return False
if journal_includes is not None and norm_text(journal_includes) not in norm_text(
journal
):
return False
if author_includes is not None:
needle = norm_text(author_includes)
haystack = " ".join(norm_text(a) for a in (authors or []))
if needle not in haystack:
return False
return True
[docs]
def dedupe_key(item: JSONObject) -> str:
"""Generate a deduplication key for a literature result.
:param item: Item dict.
"""
pmid = item.get("pmid")
doi = item.get("doi")
url = item.get("url")
title = item.get("title")
year = item.get("year")
if isinstance(pmid, str) and pmid.strip():
return f"pmid:{pmid.strip().lower()}"
if isinstance(doi, str) and doi.strip():
return f"doi:{doi.strip().lower()}"
if isinstance(url, str) and url.strip():
return f"url:{url.strip().lower()}"
return f"title:{norm_text(str(title))}|year:{year}"
_HEAD_LIMIT = 32 * 1024 # 32 KB — more than enough to capture <head>
_META_PATTERNS = [
re.compile(
r'<meta[^>]+name=["\']description["\'][^>]+content=["\']([^"\']+)["\']',
re.IGNORECASE,
),
re.compile(
r'<meta[^>]+property=["\']og:description["\'][^>]+content=["\']([^"\']+)["\']',
re.IGNORECASE,
),
re.compile(
r'<meta[^>]+name=["\']twitter:description["\'][^>]+content=["\']([^"\']+)["\']',
re.IGNORECASE,
),
]
def _extract_meta_description(text: str) -> str | None:
"""Try each meta-description pattern and return the first match."""
for pat in _META_PATTERNS:
m = pat.search(text)
if m:
return strip_tags(m.group(1)) or None
return None
def _extract_best_paragraph(text: str) -> str | None:
"""Return the longest non-boilerplate ``<p>`` content."""
paras = re.findall(r"<p[^>]*>(.*?)</p>", text, flags=re.IGNORECASE | re.DOTALL)
best: str | None = None
for p in paras:
txt = strip_tags(p)
low = txt.lower()
if len(txt) < 60:
continue
if "toggle navigation" in low or "main navigation" in low:
continue
if best is None or len(txt) > len(best):
best = txt
return best
[docs]
async def fetch_page_summary(
client: httpx.AsyncClient, url: JSONValue, *, max_chars: int
) -> str | None:
"""Fetch and extract a text summary from a web page.
Streams the response and stops reading as soon as ``</head>`` is found or
32 KB have been consumed. Meta description tags are checked first; if none
are present the longest ``<p>`` in the buffered content is used as a
fallback. Returns ``None`` for PDFs, Google Scholar links, or on error.
"""
if not isinstance(url, str) or not url.strip():
return None
u = url.strip()
if u.lower().endswith(".pdf"):
return None
if "scholar.google." in u:
return None
try:
buf = ""
head_closed = False
async with client.stream(
"GET",
u,
follow_redirects=True,
headers={"Referer": "https://duckduckgo.com/"},
) as resp:
resp.raise_for_status()
async for chunk in resp.aiter_text():
buf += chunk
if "</head>" in buf.lower():
head_closed = True
break
if len(buf) >= _HEAD_LIMIT:
break
except Exception as exc:
logger.debug("Failed to fetch page for summary extraction", error=str(exc))
return None
# --- Try meta descriptions (always in <head>) ---
search_region = buf[: buf.lower().find("</head>") + 7] if head_closed else buf
desc = _extract_meta_description(search_region)
if desc:
return truncate_text(desc, max_chars)
# --- Fallback: longest <p> from whatever we already buffered ---
best = _extract_best_paragraph(buf)
return truncate_text(best, max_chars) if best else None