# app/ingest.py from __future__ import annotations import json from pathlib import Path from typing import Dict, List, Any, Tuple, Optional import yaml import numpy as np from sentence_transformers import SentenceTransformer from app.paths import DOCSTORE_DIR, INDEX_DIR from .normalize import normalize # ← central normalizer import re import time import hashlib import requests from bs4 import BeautifulSoup # -------------------- Config -------------------- def load_config(cfg_path: str) -> Dict: with open(cfg_path, "r", encoding="utf-8") as f: return yaml.safe_load(f) # -------------------- Capacity / Geo Filters (config-driven) -------------------- # controls live in config/sources.yaml: # filters: # capacity_only: true # pa_md_only: false _INCLUDE_PATTERNS = [re.compile(p, re.I) for p in [ r"\bcapacity(?:[-\s]?building)?\b", r"\btechnical\s+assistance\b", r"\bTA\b", r"\borganizational\s+(capacity|effectiveness|development|readiness|stabilization)\b", r"\borganization(?:al)?\s+infrastructure\b", r"\bback[-\s]?office\b|\bbackbone\s+organization\b", r"\bgovernance\b|\bboard\s+development\b|\bboard\s+training\b", r"\bpre[-\s]?development\b|\bpredevelopment\b|\bplanning\s+grant\b", r"\bdata\s+systems?\b|\bCRM\b|\bcase\s+management\b", r"\b(staff|workforce)\s+capacity\b|\bhire\s+(?:staff|positions?)\b", r"\bscal(?:e|ing)\s+capacity\b|\bexpand\s+capacity\b", r"\bnonprofit\b|\bfaith[-\s]?based\b|\bcommunity[-\s]?based\b", ]] _EXCLUDE_PATTERNS = [re.compile(p, re.I) for p in [ r"\bteaching\s+assistant\b|\bTAs\b", r"\bbench\s+capacity\b|\bmanufacturing\s+capacity\b(?!.*organiz)", r"\bclinical\s+trial\b|\blaboratory\s+capacity\b(?!.*community)", r"\b(postsecondary|university|college)\b(?!.*community\s+partner)", r"\bconstruction\b(?!.*(admin|organiz|back[-\s]?office|governance|systems))", ]] _PA_MD_HINTS = re.compile( r"\b(" r"Pennsylvania|PA\b|Harrisburg|Philadelphia|Allegheny|Montgomery County\b|Pittsburgh|Scranton|Erie|" r"Maryland|MD\b|Annapolis|Baltimore|Prince\s+George'?s|Howard County\b" r")\b", re.I, ) def _doc_text_from_row(rec: Dict[str, Any]) -> str: title = rec.get("title") or "" synopsis = rec.get("synopsis") or rec.get("summary") or "" agency = rec.get("agency") or "" eligibility = rec.get("eligibility") or "" categories = " ".join(rec.get("categories") or []) if isinstance(rec.get("categories"), list) else (rec.get("categories") or "") geo = rec.get("geo") or "" return "\n".join([title, synopsis, agency, eligibility, categories, geo]).strip() def _is_capacity_building_text(text: str) -> bool: if not text: return False if any(p.search(text) for p in _EXCLUDE_PATTERNS): return False return any(p.search(text) for p in _INCLUDE_PATTERNS) def _is_pa_md_text(text: str) -> bool: if not text: return False return bool(_PA_MD_HINTS.search(text)) # -------------------- Grants.gov collector -------------------- def _collect_from_grantsgov_api(src: Dict) -> List[Dict[str, Any]]: """ Calls the Grants.gov Search2 client and returns a list of RAW dicts (adapter may already be close to unified; we'll still run normalize()). """ from app.sources.grantsgov_api import search_grants # local import to avoid cycles api = src.get("api", {}) page_size = int(api.get("page_size", src.get("page_size", 100))) max_pages = int(api.get("max_pages", src.get("max_pages", 5))) payload = api.get("payload", src.get("payload", {})) url = src.get("url", "") out = search_grants(url, payload, page_size=page_size, max_pages=max_pages) hits = out.get("hits", []) if isinstance(out, dict) else (out or []) return [h for h in hits if isinstance(h, dict)] # -------------------- NEW: Generic HTML / PDF collectors -------------------- _HTTP_HEADERS = { "User-Agent": "grants-rag/1.0 (+https://example.local) requests", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", } def _http_get(url: str, timeout: int = 20) -> Optional[requests.Response]: try: r = requests.get(url, headers=_HTTP_HEADERS, timeout=timeout) if r.status_code == 200 and r.content: return r except requests.RequestException: return None return None def _soup(html: str) -> BeautifulSoup: # use lxml or html5lib if available for robustness return BeautifulSoup(html, "lxml") def _text_from_soup(s: BeautifulSoup, selectors: Optional[List[str]] = None) -> Tuple[str, str]: """ Returns (title, text). Uses selectors if provided; falls back to common content containers. """ title = s.title.string.strip() if s.title and s.title.string else "" nodes = [] if selectors: for css in selectors: nodes.extend(s.select(css) or []) if not nodes: for css in ("main", "article", "#content", ".content", "[role='main']"): nodes.extend(s.select(css) or []) if not nodes: nodes = [s.body] if s.body else [] parts: List[str] = [] for n in nodes: if not n: continue txt = n.get_text(separator="\n", strip=True) if txt: parts.append(txt) body = "\n\n".join(parts).strip() return title, body def _make_id(*fields: str) -> str: h = hashlib.sha1() for f in fields: if f: h.update(f.encode("utf-8", "ignore")) h.update(b"|") return h.hexdigest() def _normalize_web_record( source_name: str, url: str, title: str, body: str, static: Dict[str, Any], extra: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Produce a record shaped like normalize() output so downstream stays unchanged. """ rec = { "id": (extra or {}).get("id") or _make_id(url, title or body[:160]), "title": title or (extra.get("title") if extra else "") or url, "synopsis": body[:2000], # clip; embeddings use title+synopsis later "summary": None, "url": url, "source": source_name, "geo": static.get("geo"), "categories": static.get("categories"), "agency": (extra or {}).get("agency", ""), "eligibility": (extra or {}).get("eligibility", ""), "deadline": (extra or {}).get("deadline"), "program_number": (extra or {}).get("program_number"), "posted_date": (extra or {}).get("posted_date"), } return rec def _collect_from_http_html(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]: """ Supports types: 'web_page' and 'http_html' Config keys supported: - url (str) - parse: { follow_links: bool, link_selectors: [..], content_selectors: [..] } - crawl: { schedule: "...", max_depth: int } # max_depth 0/None = only landing """ url = entry.get("url") if not url: return [] r = _http_get(url) if not r: return [] s = _soup(r.text) parse = entry.get("parse", {}) or entry.get("extract", {}) or {} content_selectors = parse.get("content_selectors") or [] title, body = _text_from_soup(s, content_selectors) rows = [] rows.append(_normalize_web_record(source_name, url, title, body, static, extra={"posted_date": None})) # follow links? follow = bool(parse.get("follow_links")) link_selectors = parse.get("link_selectors") or [] crawl = entry.get("crawl", {}) or {} max_depth = int(crawl.get("max_depth", 0) or 0) visited = set([url]) def _enq_links(soup: BeautifulSoup) -> List[str]: if link_selectors: links = [] for sel in link_selectors: for a in soup.select(sel) or []: href = a.get("href") if href and href.startswith("http"): links.append(href) out, seen = [], set() for h in links: if h not in seen: out.append(h) seen.add(h) return out[:40] # polite cap return [] if follow and max_depth > 0: frontier = _enq_links(s) depth = 1 while frontier and depth <= max_depth and len(rows) < 200: next_frontier = [] for link in frontier: if link in visited: continue visited.add(link) rr = _http_get(link) if not rr: continue ss = _soup(rr.text) t2, b2 = _text_from_soup(ss, content_selectors) if b2: rows.append(_normalize_web_record(source_name, link, t2, b2, static, extra={"posted_date": None})) if depth < max_depth: next_frontier.extend(_enq_links(ss)) time.sleep(0.1) # gentle frontier = next_frontier depth += 1 return rows def _collect_from_http_pdf(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]: """ type: 'http_pdf' keys: - url (single PDF fetch) """ url = entry.get("url") if not url: return [] try: from pdfminer.high_level import extract_text # lazy import except Exception: return [] rows = [] r = _http_get(url, timeout=40) if not r: return rows tmp = DOCSTORE_DIR / (hashlib.sha1(url.encode("utf-8")).hexdigest() + ".pdf") try: DOCSTORE_DIR.mkdir(parents=True, exist_ok=True) tmp.write_bytes(r.content) body = extract_text(str(tmp)) or "" finally: try: tmp.unlink(missing_ok=True) except Exception: pass title = entry.get("name") or "PDF Document" if body.strip(): rows.append(_normalize_web_record(source_name, url, title, body, static, extra={"posted_date": None})) return rows # -------------------- Write docstore & build index -------------------- def _save_docstore(recs: List[Dict[str, Any]]) -> str: DOCSTORE_DIR.mkdir(parents=True, exist_ok=True) path = DOCSTORE_DIR / "docstore.jsonl" with path.open("w", encoding="utf-8") as f: for r in recs: f.write(json.dumps(r, ensure_ascii=False) + "\n") return str(path) def _build_index_from_docstore() -> int: ds_path = DOCSTORE_DIR / "docstore.jsonl" if not ds_path.exists(): raise RuntimeError("Docstore not found. Run ingest first.") texts: List[str] = [] metas: List[Dict[str, Any]] = [] with ds_path.open("r", encoding="utf-8") as f: for line in f: rec = json.loads(line) title = rec.get("title") or "" synopsis = rec.get("synopsis") or rec.get("summary") or "" agency = rec.get("agency") or "" eligibility = rec.get("eligibility") or "" txt = "\n".join([title, synopsis, agency, eligibility]).strip() if not txt: continue texts.append(txt) metas.append({ "id": rec.get("id"), "title": title, "url": rec.get("url"), "source": rec.get("source"), "geo": rec.get("geo"), "categories": rec.get("categories"), "agency": agency, "deadline": rec.get("deadline"), "program_number": rec.get("program_number"), "posted_date": rec.get("posted_date"), }) print(f"[index] Rows loaded from docstore: {len(texts)}") if not texts: INDEX_DIR.mkdir(parents=True, exist_ok=True) (INDEX_DIR / "meta.json").write_text(json.dumps([], ensure_ascii=False)) print("[index] No texts to embed. Wrote empty meta.json.") return 0 # Embed (CPU default; portable) model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") model.max_seq_length = 256 batch = max(8, min(32, len(texts))) emb = model.encode( texts, convert_to_numpy=True, normalize_embeddings=True, show_progress_bar=True, batch_size=batch, ).astype(np.float32, copy=False) # FAISS index (Inner Product for cosine on normalized vectors) import faiss dim = emb.shape[1] index = faiss.IndexFlatIP(dim) index.add(emb) INDEX_DIR.mkdir(parents=True, exist_ok=True) faiss.write_index(index, str(INDEX_DIR / "faiss.index")) (INDEX_DIR / "meta.json").write_text(json.dumps(metas, ensure_ascii=False)) print(f"[index] Wrote FAISS index with {emb.shape[0]} vectors (dim={dim}).") return len(texts) # -------------------- Public API: ingest -------------------- __all__ = ["ingest"] def ingest(cfg_path: str = "config/sources.yaml", env: Dict | None = None): """ Reads config, fetches from enabled sources via adapters, normalizes to a single schema, applies filters (capacity / PA-MD), dedupes, writes docstore, and builds the FAISS index. Returns (docstore_path, n_indexed). """ cfg = load_config(cfg_path) # ---- Filters from config ---- f_cfg = (cfg or {}).get("filters", {}) or {} capacity_only = bool(f_cfg.get("capacity_only", True)) pa_md_only = bool(f_cfg.get("pa_md_only", False)) print(f"[filters] capacity_only = {'TRUE' if capacity_only else 'FALSE'}") print(f"[filters] pa_md_only = {'TRUE' if pa_md_only else 'FALSE'}") all_rows: List[Dict[str, Any]] = [] for entry in cfg.get("sources", []): if not entry.get("enabled"): continue name = entry.get("name", "") geo = entry.get("geo") or "US" cats = entry.get("categories") or [] static = {"geo": geo, "categories": cats} typ = entry.get("type") rows: List[Dict[str, Any]] = [] if typ == "grantsgov_api": raw_hits = _collect_from_grantsgov_api(entry) rows = [normalize("grants_gov", h, static) for h in raw_hits] elif typ in ("web_page", "http_html"): rows = _collect_from_http_html(entry, name, static) elif typ == "http_pdf": rows = _collect_from_http_pdf(entry, name, static) elif typ == "local_sample": p = Path(entry["path"]).expanduser() blob = json.loads(p.read_text(encoding="utf-8")) items = blob.get("opportunities") or [] rows = [normalize("local_sample", op, static) for op in items] # Unknown types => skip silently # ---- Apply capacity / geo filters BEFORE collecting ---- if rows and (capacity_only or pa_md_only): filtered = [] for r in rows: t = _doc_text_from_row(r) if capacity_only and not _is_capacity_building_text(t): continue if pa_md_only and not _is_pa_md_text(t): continue filtered.append(r) print(f"[filter] {name}: kept {len(filtered)}/{len(rows)} after filters") rows = filtered print(f"[collect] {name} → {len(rows)} rows") all_rows.extend(rows) # ---- DEDUPE (by id → url → title) ---- seen, unique = set(), [] for r in all_rows: key = r.get("id") or r.get("url") or r.get("title") if not key or key in seen: continue seen.add(key) unique.append(r) print(f"[ingest] Unique records to index: {len(unique)}") path = _save_docstore(unique) n = _build_index_from_docstore() return path, n # -------------------- CLI -------------------- if __name__ == "__main__": import argparse ap = argparse.ArgumentParser() ap.add_argument("--config", default="config/sources.yaml") args = ap.parse_args() p, n = ingest(args.config) print(f"Ingested {n} records. Docstore at {p}")