Spaces:
Running
Running
import scrapy | |
from scrapy.crawler import CrawlerProcess | |
from bs4 import BeautifulSoup | |
from dotenv import load_dotenv | |
from supabase import create_client | |
from datetime import datetime | |
import os, re, tempfile | |
SUPABASE_URL = os.environ.get("NEXT_PUBLIC_SUPABASE_URL") | |
SUPABASE_KEY = os.environ.get("NEXT_PUBLIC_SUPABASE_SERVICE_KEY") | |
SUPABASE_BUCKET = os.environ.get("NEXT_PUBLIC_SUPABASE_STORAGE_BUCKET", "pnp-bot-storage") | |
def is_valid_prodi(nama): | |
return bool(re.search( | |
r'\b(D[2-4]|Diploma ?[2-4]|Magister|Sarjana Terapan|Teknologi Rekayasa)\b', | |
nama, re.I | |
)) | |
def normalize_nama(nama): | |
return re.sub( | |
r'\b(D[2-4]|Diploma ?[2-4]|Magister|Sarjana Terapan|Teknologi Rekayasa)\b', | |
'', nama, flags=re.I | |
).strip().lower() | |
def extract_program_subpages(soup, base_url): | |
links = [] | |
ul_tags = soup.find_all("ul") | |
for ul in ul_tags: | |
for a in ul.find_all("a", href=True): | |
text = a.get_text(strip=True) | |
href = a["href"] | |
if any(kata in text.lower() for kata in ["deskripsi", "visi", "kurikulum", "lulusan"]): | |
full_url = href if href.startswith("http") else base_url + href | |
links.append((text, full_url)) | |
return links | |
class JurusanSpider(scrapy.Spider): | |
name = "jurusan" | |
custom_settings = {"LOG_LEVEL": "INFO", "USER_AGENT": "Mozilla/5.0"} | |
domain_to_name = { | |
'akt.pnp.ac.id': 'Akuntansi', | |
'an.pnp.ac.id': 'Administrasi_Niaga', | |
'bing.pnp.ac.id': 'Bahasa_Inggris', | |
'elektro.pnp.ac.id': 'Teknik_Elektro', | |
'me.pnp.ac.id': 'Teknik_Mesin', | |
'sipil.pnp.ac.id': 'Teknik_Sipil', | |
'ti.pnp.ac.id': 'Teknologi_Informasi', | |
} | |
start_urls = [f"https://{d}/" for d in domain_to_name.keys()] | |
def __init__(self): | |
self.supabase = create_client(SUPABASE_URL, SUPABASE_KEY) | |
self.bucket = SUPABASE_BUCKET | |
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M") | |
self.per_jurusan_pages = {} | |
self.rekap_prodi = {} | |
def parse(self, response): | |
domain = response.url.split("//")[1].split("/")[0] | |
jurusan = self.domain_to_name.get(domain, domain) | |
soup = BeautifulSoup(response.text, "html.parser") | |
program_studi = [] | |
menu_elements = soup.find_all("a", string=re.compile("program studi", re.I)) | |
for menu in menu_elements: | |
ul = menu.find_next("ul") | |
if ul: | |
for li in ul.find_all("li", recursive=False): | |
a_tag = li.find("a") | |
if a_tag: | |
item = a_tag.get_text(strip=True) | |
if item and item not in program_studi: | |
program_studi.append(item) | |
# Tambahan: ambil sub-halaman prodi | |
href = a_tag.get("href") | |
if href: | |
prodi_url = response.urljoin(href) | |
yield scrapy.Request( | |
prodi_url, callback=self.parse_detail, meta={"jurusan": jurusan, "url": prodi_url} | |
) | |
self.rekap_prodi[jurusan] = program_studi | |
for a in soup.find_all("a", href=True): | |
href = a["href"] | |
if href.startswith("http") and domain in href: | |
yield scrapy.Request(href, callback=self.parse_detail, meta={"jurusan": jurusan, "url": href}) | |
elif href.startswith("/"): | |
yield scrapy.Request(response.urljoin(href), callback=self.parse_detail, meta={"jurusan": jurusan, "url": response.urljoin(href)}) | |
def parse_detail(self, response): | |
jurusan = response.meta["jurusan"] | |
url = response.meta["url"] | |
soup = BeautifulSoup(response.text, "html.parser") | |
for selector in ["header", "footer", "nav", "aside", ".header", ".footer", ".navbar", ".nav", ".sidebar"]: | |
for tag in soup.select(selector): | |
tag.decompose() | |
for tag in soup(["script", "style", "noscript"]): | |
tag.decompose() | |
title_tag = soup.find("title") or soup.find("h1") | |
page_title = title_tag.get_text(strip=True) if title_tag else "Halaman" | |
body_text = [] | |
for p in soup.find_all(["p", "h1", "h2", "h3", "h4", "h5", "h6", "li"]): | |
txt = p.get_text(strip=True) | |
if txt: | |
body_text.append(txt) | |
content_text = f"""# {page_title} | |
URL: {url} | |
Tanggal Akses: {datetime.now().strftime('%d %B %Y %H:%M')} | |
""" + "\n\n".join(body_text) | |
tables = soup.find_all("table") | |
for i, table in enumerate(tables): | |
content_text += f"\n\nTabel {i+1}\n\n" | |
for row in table.find_all("tr"): | |
cols = row.find_all(["td", "th"]) | |
row_data = [col.get_text(strip=True) for col in cols] | |
content_text += " | ".join(row_data) + "\n" | |
self.per_jurusan_pages.setdefault(jurusan, []).append({ | |
"url": url, | |
"title": page_title, | |
"content": content_text | |
}) | |
def closed(self, reason): | |
for jurusan, pages in self.per_jurusan_pages.items(): | |
filename = f"{jurusan.replace(' ', '_').upper()}_{self.timestamp}.txt" | |
try: | |
with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8", delete=False, suffix=".txt") as f: | |
for page in pages: | |
f.write(page["content"] + "\n\n---\n\n") | |
temp_path = f.name | |
self.supabase.storage.from_(self.bucket).upload( | |
path=filename, | |
file=temp_path, | |
file_options={"content-type": "text/plain"} | |
) | |
self.logger.info(f"β Uploaded file jurusan: {filename}") | |
except Exception as e: | |
self.logger.error(f"β Gagal upload {filename}: {e}") | |
finally: | |
if os.path.exists(temp_path): | |
os.remove(temp_path) | |
# Rekap program studi | |
rekap_filename = f"REKAP_PROGRAM_STUDI_{self.timestamp}.txt" | |
try: | |
with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8", delete=False, suffix=".txt") as f: | |
f.write(f"# REKAP PROGRAM STUDI PNP\nDiperbarui pada: {datetime.now().strftime('%d %B %Y %H:%M')}\n\n") | |
total = 0 | |
for jurusan, daftar in self.rekap_prodi.items(): | |
f.write(f"{jurusan.replace('_', ' ')}:\n") | |
for p in daftar: | |
f.write(f"- {p}\n") | |
f.write(f"Jumlah: {len(daftar)}\n\n") | |
total += len(daftar) | |
f.write(f"TOTAL PROGRAM STUDI: {total}\n") | |
temp_path = f.name | |
self.supabase.storage.from_(self.bucket).upload( | |
path=rekap_filename, | |
file=temp_path, | |
file_options={"content-type": "text/plain"} | |
) | |
self.logger.info(f"β Uploaded file rekap: {rekap_filename}") | |
except Exception as e: | |
self.logger.error(f"β Gagal upload rekap: {e}") | |
finally: | |
if os.path.exists(temp_path): | |
os.remove(temp_path) | |
if __name__ == "__main__": | |
process = CrawlerProcess() | |
process.crawl(JurusanSpider) | |
process.start() | |