FauziIsyrinApridal
test environ
5565d95
import scrapy
from scrapy.crawler import CrawlerProcess
from datetime import datetime
import re
from supabase import create_client
import os
class DosenSpider(scrapy.Spider):
name = 'dosen_spider'
start_urls = ['https://sipeg.pnp.ac.id/']
custom_settings = {
'DOWNLOAD_DELAY': 2,
'USER_AGENT': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'ROBOTSTXT_OBEY': True,
'LOG_LEVEL': 'INFO',
'CONCURRENT_REQUESTS': 1,
'DOWNLOAD_TIMEOUT': 100,
'RETRY_TIMES': 3
}
def __init__(self, *args, **kwargs):
super(DosenSpider, self).__init__(*args, **kwargs)
# Initialize Supabase client
self.supabase = create_client(
os.environ.get("NEXT_PUBLIC_SUPABASE_URL"),
os.environ.get("NEXT_PUBLIC_SUPABASE_SERVICE_KEY")
)
self.storage_bucket = os.environ.get("NEXT_PUBLIC_SUPABASE_STORAGE_BUCKET")
self.collected_data = []
def parse(self, response):
# Mengekstrak menu utama dan submenu
main_menu_items = response.css('li.level1')
for menu_item in main_menu_items:
menu_title = menu_item.css('span.bg::text').get('').strip()
main_link = menu_item.css('a::attr(href)').get()
if main_link:
main_link = response.urljoin(main_link)
# Follow link menu utama
yield scrapy.Request(
url=main_link,
callback=self.parse_page,
meta={'page_title': menu_title, 'page_number': 1}
)
# Cek submenu
submenus = menu_item.css('li.level2')
for submenu in submenus:
submenu_title = submenu.css('span.bg::text').get('').strip()
submenu_link = submenu.css('a::attr(href)').get()
if submenu_link:
submenu_link = response.urljoin(submenu_link)
# Follow link submenu
yield scrapy.Request(
url=submenu_link,
callback=self.parse_page,
meta={'page_title': submenu_title, 'page_number': 1}
)
def parse_page(self, response):
page_title = response.meta.get('page_title', '')
page_number = response.meta.get('page_number', 1)
# Cek pesan "Data belum tersedia"
page_text = ' '.join(response.css('body ::text').getall()).lower()
unavailable_messages = [
'data staf pengajar belum tersedia',
'data staf administrasi belum tersedia',
'data staf teknisi belum tersedia'
]
if any(msg in page_text for msg in unavailable_messages):
self.logger.info(f"Data tidak tersedia pada halaman: {response.url}")
return
# Cek tabel dalam halaman
tables = response.css('table.table-landscape, table.table, table.table-bordered')
if tables:
for table in tables:
# Ambil header tabel untuk menentukan jenis tabel
headers = [h.strip() for h in table.css('th::text').getall()]
# Tentukan jenis tabel berdasarkan header
if 'Jabatan' in headers and 'Pejabat' in headers:
yield from self.extract_officials_table(table, page_title)
elif 'Nama' in headers and 'NIP' in headers:
# Tentukan jenis staf berdasarkan judul halaman
staff_type = self.determine_simple_staff_type(page_title)
yield from self.extract_staff_table(table, page_title, staff_type, page_number)
else:
self.logger.info(f"No tables found on page: {response.url}")
# Improved pagination handling
current_url = response.url
base_url = current_url.split('?')[0] if '?' in current_url else current_url
# Extract p value from current URL if it exists
current_p = 0
if 'p=' in current_url:
try:
current_p = int(current_url.split('p=')[1].split('&')[0])
except (ValueError, IndexError):
current_p = 0
# Determine items per page based on staff type
staff_type = self.determine_simple_staff_type(page_title)
if staff_type == 'staff_pengajar':
items_per_page = 30
elif staff_type in ['staff_administrasi', 'staff_teknisi']:
items_per_page = 25
else:
items_per_page = 0 # No pagination for jabatan
# First try to get the Next link using XPath
next_page = None
next_link = response.xpath('//span[@class="table-link"]/a[contains(text(), "Next")]/@href').get()
if next_link:
next_page = response.urljoin(next_link)
elif current_p >= 0 and items_per_page > 0:
next_p = items_per_page if current_p == 0 else current_p + items_per_page
next_page = f"{base_url}?p={next_p}"
self.logger.info(f"Constructed next page URL with p parameter: {next_page}")
# Fallback to other pagination methods if specific method failed
if not next_page:
pagination_xpath_patterns = [
'//ul[contains(@class, "pagination")]/li/a[contains(text(), "Next")]/@href',
'//ul[contains(@class, "pagination")]/li/a[contains(text(), "»")]/@href',
f'//ul[contains(@class, "pagination")]/li/a[contains(text(), "{page_number + 1}")]/@href',
'//a[@class="next page-numbers"]/@href',
]
for xpath in pagination_xpath_patterns:
next_page_link = response.xpath(xpath).get()
if next_page_link:
next_page = response.urljoin(next_page_link)
self.logger.info(f"Found next page link using XPath: {next_page}")
break
# Generic parameter detection as last resort
if not next_page:
if 'page=' in current_url:
next_page = current_url.replace(f'page={page_number}', f'page={page_number + 1}')
elif 'p=' in current_url and 'p=' not in next_page:
next_page = current_url.replace(f'p={current_p}', f'p={current_p + items_per_page}')
elif 'halaman=' in current_url:
next_page = current_url.replace(f'halaman={page_number}', f'halaman={page_number + 1}')
elif 'page/' in current_url:
next_page = current_url.replace(f'page/{page_number}', f'page/{page_number + 1}')
if next_page:
next_page_number = page_number + 1
if 'p=' in next_page:
try:
p_value = int(next_page.split('p=')[1].split('&')[0])
next_page_number = (p_value // items_per_page) + 1
except (ValueError, IndexError):
pass
self.logger.info(f"Following to next page: {next_page} (Page {next_page_number})")
yield scrapy.Request(
url=next_page,
callback=self.parse_page,
meta={'page_title': page_title, 'page_number': next_page_number}
)
def determine_simple_staff_type(self, page_title):
"""Menentukan jenis staf berdasarkan judul halaman"""
page_title_lower = page_title.lower()
if any(word in page_title_lower for word in ['dosen', 'pengajar', 'akademik', 'jurusan']):
return 'staff_pengajar'
elif any(word in page_title_lower for word in ['administrasi', 'admin', 'tata usaha', 'pegawai']):
return 'staff_administrasi'
elif any(word in page_title_lower for word in ['teknisi', 'lab', 'teknik', 'laboratorium']):
return 'staff_teknisi'
return 'staff_lainnya'
def extract_officials_table(self, table, page_title):
rows = table.css('tr')
for row in rows:
row_html = row.get()
period_match = re.search(r'<!--\s*<td[^>]*>(.*?)</td>\s*-->', row_html)
period = period_match.group(1).strip() if period_match else ""
cells = row.css('td')
if len(cells) < 3:
continue
number = cells[0].css('::text').get('').strip()
position = cells[1].css('::text').get('').strip()
official = cells[2].css('::text').get('').strip()
item = {
'halaman': page_title,
'tipe': 'jabatan',
'nomor': number,
'jabatan': position,
'pejabat': official,
'periode': period
}
self.collected_data.append(item)
yield item
def extract_staff_table(self, table, page_title, staff_type, page_number):
rows = table.css('tr')
rows = rows[1:] if len(rows) > 1 else []
for row in rows:
cells = row.css('td')
if len(cells) < 3:
continue
number = cells[0].css('::text').get('').strip() if len(cells) > 0 else ""
name_cell = cells[1] if len(cells) > 1 else None
name = ""
if name_cell:
name_link = name_cell.css('a::text').get()
name = name_link.strip() if name_link else name_cell.css('::text').get('').strip()
detail_url = name_cell.css('a::attr(href)').get()
nip = cells[2].css('::text').get('').strip() if len(cells) > 2 else ""
department = cells[3].css('::text').get('').strip() if len(cells) > 3 else ""
if not name and not nip:
continue
item = {
'halaman': page_title,
'tipe': staff_type,
'halaman_ke': page_number,
'nomor': number,
'nama': name,
'nip': nip,
'jurusan': department,
'detail': detail_url
}
self.collected_data.append(item)
yield item
def closed(self, reason):
"""Called when spider closes - formats data and uploads to Supabase"""
# Generate text content
text_content = self.generate_text_output()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"data_dosen_{timestamp}.txt"
# Upload to Supabase
try:
# Upload new file
res = self.supabase.storage.from_(self.storage_bucket).upload(
path=filename,
file=text_content.encode('utf-8'),
file_options={"content-type": "text/plain"},
)
if res:
self.logger.info(f"Successfully uploaded {filename} to Supabase storage")
else:
self.logger.error(f"Failed to upload {filename} to Supabase")
except Exception as e:
self.logger.error(f"Error uploading to Supabase: {str(e)}")
def generate_text_output(self):
output = []
output.append(f"# Data Dosen dan Staff PNP\n")
output.append(f"Diperbarui pada: {datetime.now().strftime('%d %B %Y %H:%M')}\n\n")
grouped = {}
for item in self.collected_data:
tipe = item.get('tipe', 'lainnya')
grouped.setdefault(tipe, []).append(item)
section_titles = {
'jabatan': 'Daftar Jabatan Struktural',
'staff_pengajar': 'Daftar Dosen dan Pengajar',
'staff_administrasi': 'Daftar Staff Administrasi',
'staff_teknisi': 'Daftar Staff Teknisi',
'staff_lainnya': 'Daftar Staff Lainnya'
}
for tipe, items in grouped.items():
title = section_titles.get(tipe, tipe.capitalize())
output.append(f"# {title}\n")
output.append(f"Jumlah data: {len(items)}\n\n")
for item in items:
if tipe == 'jabatan':
paragraph = f"{item['pejabat']} menjabat sebagai {item['jabatan']}."
if item.get('periode'):
paragraph += f" Masa jabatan berlangsung selama {item['periode']}."
else:
paragraph = f"{item['nama']} adalah staf dengan NIP {item['nip']}."
if item.get('jurusan'):
paragraph += f" Ia bertugas di {item['jurusan']}."
if item.get('detail'):
paragraph += f" Informasi lebih lengkap tersedia di {item['detail']}."
output.append(paragraph + "\n\n")
return ''.join(output)
if __name__ == '__main__':
process = CrawlerProcess()
process.crawl(DosenSpider)
process.start()