Spaces:
Running
Running
import scrapy | |
from scrapy.crawler import CrawlerProcess | |
from datetime import datetime | |
import re | |
import os | |
from supabase import create_client, Client | |
SUPABASE_URL = os.environ.get("NEXT_PUBLIC_SUPABASE_URL") | |
SUPABASE_KEY = os.environ.get("NEXT_PUBLIC_SUPABASE_SERVICE_KEY") | |
SUPABASE_BUCKET = os.environ.get("NEXT_PUBLIC_SUPABASE_STORAGE_BUCKET") | |
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
class PNPContentSpider(scrapy.Spider): | |
name = 'pnp_content_spider' | |
start_urls = ['https://www.pnp.ac.id','https://penerimaan.pnp.ac.id'] | |
excluded_subdomains = [ | |
'akt.pnp.ac.id', | |
'an.pnp.ac.id', | |
'bing.pnp.ac.id', | |
'elektro.pnp.ac.id', | |
'me.pnp.ac.id', | |
'sipil.pnp.ac.id', | |
'ti.pnp.ac.id' | |
] | |
custom_settings = { | |
'DOWNLOAD_DELAY': 2, | |
'RETRY_TIMES': 3, | |
'HTTPCACHE_ENABLED': False, | |
'USER_AGENT': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)' | |
} | |
def format_paragraph(self, text: str) -> str: | |
sentences = re.split(r'(?<=[.!?]) +', text.strip()) | |
paragraph = '' | |
word_count = 0 | |
for sentence in sentences: | |
words = sentence.split() | |
word_count += len(words) | |
paragraph += sentence + ' ' | |
if 50 <= word_count <= 150: | |
break | |
return paragraph.strip() | |
def parse(self, response): | |
self.logger.info(f"Processing main page: {response.url}") | |
nav_items = response.css('ul.wp-block-navigation__container > li.wp-block-navigation-item') | |
for item in nav_items: | |
main_title = item.css('a.wp-block-navigation-item__content span.wp-block-navigation-item__label::text').get() | |
if not main_title: | |
main_title = item.css('a.wp-block-navigation-item__content::text').get('').strip() | |
main_link = item.css('a.wp-block-navigation-item__content::attr(href)').get() | |
if main_link and not main_link.startswith('#'): | |
main_link = response.urljoin(main_link) | |
if "jurusan" in main_link.lower(): | |
continue | |
yield scrapy.Request(main_link, callback=self.parse_content, meta={'page_title': main_title, 'menu_path': main_title}) | |
submenus = item.css('ul.wp-block-navigation__submenu-container > li.wp-block-navigation-item') | |
for submenu in submenus: | |
submenu_title = submenu.css('a.wp-block-navigation-item__content span.wp-block-navigation-item__label::text').get() | |
if not submenu_title: | |
submenu_title = submenu.css('a.wp-block-navigation-item__content::text').get('').strip() | |
submenu_link = submenu.css('a.wp-block-navigation-item__content::attr(href)').get() | |
if submenu_link and not submenu_link.startswith('#'): | |
submenu_link = response.urljoin(submenu_link) | |
if "jurusan" in submenu_link.lower(): | |
continue | |
menu_path = f"{main_title} > {submenu_title}" if main_title else submenu_title | |
yield scrapy.Request(submenu_link, callback=self.parse_content, meta={'page_title': submenu_title, 'menu_path': menu_path}) | |
def parse_content(self, response): | |
page_title = response.meta.get('page_title', 'Unknown Page') | |
menu_path = response.meta.get('menu_path', '') | |
if page_title == 'Unknown Page': | |
page_title = response.css('h1.entry-title::text, h1.page-title::text').get('').strip() | |
self.logger.info(f"Extracting content from: {response.url} ({page_title})") | |
paragraphs = [] | |
# Extra logic for halaman penerimaan | |
if 'penerimaan.pnp.ac.id' in response.url: | |
self.logger.info("Detected penerimaan page, extracting special widget content.") | |
widgets = response.css('div.widget_circleicon-widget, div.elementor-widget-container') | |
for widget in widgets: | |
title = widget.css('h4::text, h3::text').get('') | |
desc = widget.css('p::text').get('') | |
link = widget.css('a::attr(href)').get() | |
if title and desc: | |
combined = f"{title.strip()}. {desc.strip()}" | |
if link: | |
combined += f" (Link: {response.urljoin(link)})" | |
paragraphs.append(combined) | |
# Normal content extraction | |
content_selectors = [ | |
'div.entry-content', 'article.post', 'main.site-main', | |
'div.content', 'div.main-content', 'div#content', 'div.page-content' | |
] | |
for selector in content_selectors: | |
content_area = response.css(selector) | |
if content_area: | |
elems = content_area.css('p, h1, h2, h3, h4, h5, h6, li') | |
for elem in elems: | |
text = ' '.join(elem.css('*::text').getall()).strip() | |
if text: | |
links = elem.css('a::attr(href)').getall() | |
for link in links: | |
text += f" (Link: {response.urljoin(link)})" | |
paragraphs.append(text) | |
if paragraphs: | |
break | |
# Fallback: ambil semua teks body jika tidak ada konten utama | |
if not paragraphs: | |
paragraphs = [t.strip() for t in response.css('body *::text').getall() if t.strip()] | |
# Format paragraphs | |
formatted_paragraphs = [] | |
for para in paragraphs: | |
para = para.replace('\n', ' ').strip() | |
if len(para.split()) >= 10: | |
formatted_paragraphs.append(self.format_paragraph(para)) | |
# Final formatted text | |
content_text = f"""# {page_title} | |
Tanggal: {datetime.now().strftime('%d %B %Y')} | |
URL: {response.url} | |
""" + "\n\n".join(formatted_paragraphs) | |
# Table extraction | |
tables = response.css('table') | |
table_output = [] | |
for table in tables: | |
for row in table.css('tr'): | |
cells = row.css('th, td') | |
row_data = [] | |
for cell in cells: | |
cell_text = ' '.join(cell.css('*::text').getall()).strip() | |
if link := cell.css('a::attr(href)').get(): | |
cell_text += f" (Link: {response.urljoin(link)})" | |
if cell_text: | |
row_data.append(cell_text) | |
if row_data: | |
table_output.append(" - ".join(row_data)) | |
if table_output: | |
content_text += "\n\n# Tabel Data\n\n" + "\n".join(table_output) | |
# Generate safe filename | |
safe_title = re.sub(r'[^\w\s-]', '', page_title).strip().lower() | |
safe_title = re.sub(r'[-\s]+', '-', safe_title) | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"{safe_title}_{timestamp}.txt" | |
# Upload to Supabase | |
try: | |
supabase.storage.from_(SUPABASE_BUCKET).upload( | |
path=filename, | |
file=content_text.encode('utf-8'), | |
file_options={"content-type": "text/plain"} | |
) | |
self.logger.info(f"Uploaded {filename} successfully.") | |
except Exception as e: | |
self.logger.error(f"Upload error for {filename}: {str(e)}") | |
yield { | |
'url': response.url, | |
'title': page_title, | |
'menu_path': menu_path, | |
'uploaded_as': filename, | |
'timestamp': datetime.now().isoformat() | |
} | |
# Additional scraping on same domain | |
current_domain = response.url.split('//')[1].split('/')[0] | |
if 'pnp.ac.id' not in current_domain: | |
header_links = [] | |
for sel in ['header a::attr(href)', 'nav a::attr(href)', '.navbar a::attr(href)']: | |
header_links.extend(response.css(sel).getall()) | |
for link in set(link for link in header_links if link and not link.startswith(('#', 'javascript:'))): | |
full_link = response.urljoin(link) | |
if current_domain in full_link: | |
yield scrapy.Request( | |
url=full_link, | |
callback=self.parse_content, | |
meta={'page_title': 'Header Link', 'menu_path': f"{menu_path} > Header"} | |
) | |
if __name__ == '__main__': | |
process = CrawlerProcess({ | |
'USER_AGENT': 'Mozilla/5.0', | |
'ROBOTSTXT_OBEY': True, | |
'LOG_LEVEL': 'INFO', | |
'CONCURRENT_REQUESTS': 1, | |
'DOWNLOAD_TIMEOUT': 60, | |
'RETRY_TIMES': 3, | |
'HTTPCACHE_ENABLED': False, | |
}) | |
process.crawl(PNPContentSpider) | |
process.start() | |