pnp-chatbot-admin-v1 / scrapping /jadwal_scrap.py
FauziIsyrinApridal
test environ
5565d95
import scrapy
from scrapy.crawler import CrawlerProcess
import os
import re
from datetime import datetime
from supabase import create_client
from io import StringIO
class PnpSpider(scrapy.Spider):
name = 'pnp_spider'
allowed_domains = ['presensi.pnp.ac.id', 'elektro.pnp.ac.id']
start_urls = [
'https://presensi.pnp.ac.id/',
'https://elektro.pnp.ac.id/jadwal-perkuliahan-jurusan-teknik-elektro/jadwal-perkuliahan-program-studi-teknik-listrik/'
]
excluded_departments = ['elektronika', 'telkom', 'listrik']
def __init__(self, *args, **kwargs):
super(PnpSpider, self).__init__(*args, **kwargs)
# Initialize Supabase client
url = os.environ.get("NEXT_PUBLIC_SUPABASE_URL")
key = os.environ.get("NEXT_PUBLIC_SUPABASE_SERVICE_KEY")
self.supabase = create_client(url, key)
self.storage_bucket = os.environ.get("NEXT_PUBLIC_SUPABASE_STORAGE_BUCKET")
self.file_buffers = {} # Dictionary to store StringIO objects
self.current_date = datetime.now().strftime("%Y-%m-%d")
def closed(self, reason):
print(f"Spider closing with reason: {reason}")
print(f"Uploading {len(self.file_buffers)} files to Supabase...")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
for jurusan_id, buffer in self.file_buffers.items():
filename = f"{jurusan_id}_{timestamp}.txt"
content = buffer.getvalue()
print(f"Uploading {filename} with content length: {len(content)}")
success = self.upload_to_supabase(filename, content)
if success:
print(f"✅ Successfully uploaded {filename}")
else:
print(f"❌ Failed to upload {filename}")
buffer.close()
def upload_to_supabase(self, filename, content):
"""Upload content directly to Supabase Storage"""
try:
# Upload new content
res = self.supabase.storage.from_(self.storage_bucket).upload(
path=filename,
file=content.encode('utf-8'),
file_options={"content-type": "text/plain"}
)
return True
except Exception as e:
print(f"Upload error: {str(e)}")
return False
def parse(self, response):
if 'elektro.pnp.ac.id' in response.url:
jurusan_id = 'teknik_elektro'
jurusan_name = 'Jurusan Teknik Elektro'
return self.parse_elektro_page(response, jurusan_id, jurusan_name)
print("Memulai scraping dari halaman utama...")
jurusan_links = set(response.xpath('//article[contains(@class, "section")]//a/@href').getall())
for link in jurusan_links:
if any(excluded in link.lower() for excluded in self.excluded_departments):
continue
jurusan_url = response.urljoin(link)
jurusan_id = self.extract_jurusan_id(link)
yield scrapy.Request(jurusan_url,
callback=self.parse_jurusan,
meta={'jurusan_id': jurusan_id})
def parse_elektro_page(self, response, jurusan_id, jurusan_name):
if jurusan_id not in self.file_buffers:
self.initialize_document_buffer(jurusan_id, jurusan_name)
output_buffer = self.file_buffers[jurusan_id]
tables = response.xpath('//table')
if not tables:
return
for table_idx, table in enumerate(tables):
caption_text = self.get_table_caption(table, table_idx)
class_info = self.clean_class_info(caption_text, table)
if not class_info:
continue
self.write_section_header(output_buffer, class_info)
days = table.xpath('.//thead//th[@class="xAxis"]/text()').getall() or \
table.xpath('.//thead//th[contains(@class, "xAxis")]/text()').getall()
time_slots = table.xpath('.//tbody//th[@class="yAxis"]/text()').getall() or \
table.xpath('.//tbody//th[contains(@class, "yAxis")]/text()').getall()
if not days or not time_slots:
continue
schedule_grid = self.build_schedule_grid(days, time_slots)
self.process_table_rows(table, schedule_grid, days, time_slots)
self.write_schedule_to_buffer(output_buffer, schedule_grid, days, time_slots)
def initialize_document_buffer(self, jurusan_id, jurusan_name):
"""Initialize a new document with proper title and metadata"""
self.file_buffers[jurusan_id] = StringIO()
buffer = self.file_buffers[jurusan_id]
# Write document title and metadata
buffer.write(f"# Jadwal Perkuliahan {jurusan_name}\n\n")
buffer.write(f"**Jurusan:** {jurusan_name}\n")
buffer.write(f"**Tanggal Update:** {self.current_date}\n")
buffer.write(f"**Sumber:** Politeknik Negeri Padang\n\n")
buffer.write("---\n\n")
def get_table_caption(self, table, table_idx):
"""Extract and clean table caption text"""
caption = table.xpath('.//caption//text()').getall()
caption_text = ' '.join(caption).strip()
if not caption_text:
caption_text = table.xpath('preceding::h2[1]//text()|preceding::h3[1]//text()|preceding::h4[1]//text()').get()
caption_text = caption_text.strip() if caption_text else f"Jadwal Kelas {table_idx + 1}"
return caption_text
def clean_class_info(self, caption_text, table):
"""Combine and clean class information"""
thead_class_info = ' '.join(table.xpath('.//thead/tr[1]//text()').getall()).strip()
class_info = f"{caption_text} {thead_class_info}" if thead_class_info else caption_text
return re.sub(r'\s+', ' ', class_info).strip()
def write_section_header(self, buffer, class_info):
"""Write a section header for each class schedule"""
buffer.write(f"## Jadwal Perkuliahan {class_info}\n\n")
buffer.write("Berikut adalah jadwal perkuliahan untuk kelas tersebut, diurutkan berdasarkan hari dan waktu:\n\n")
def build_schedule_grid(self, days, time_slots):
"""Initialize the schedule grid structure"""
return {day: {time: 'kosong' for time in time_slots} for day in days}
def process_table_rows(self, table, schedule_grid, days, time_slots):
"""Process table rows respecting rowspans and colspans"""
rows = table.xpath('.//tbody/tr[not(contains(@class, "foot"))]')
active_rowspans = {}
for row_idx, row in enumerate(rows):
if row_idx >= len(time_slots):
continue
current_time = time_slots[row_idx]
filled_columns = set()
# Apply active rowspans
self.apply_active_rowspans(active_rowspans, schedule_grid, days, current_time, filled_columns, row_idx)
# Process current row cells
cells = row.xpath('./td')
col_idx = 0
for cell in cells:
while col_idx < len(days) and col_idx in filled_columns:
col_idx += 1
if col_idx >= len(days):
break
cell_content = self.process_cell_content(cell)
rowspan = int(cell.xpath('./@rowspan').get() or 1)
colspan = int(cell.xpath('./@colspan').get() or 1)
self.update_schedule_grid(schedule_grid, days, current_time, col_idx, colspan, cell_content)
self.update_active_rowspans(active_rowspans, row_idx, col_idx, colspan, rowspan, cell_content)
col_idx += colspan
def apply_active_rowspans(self, active_rowspans, schedule_grid, days, current_time, filled_columns, row_idx):
"""Apply content from cells with rowspan to current row"""
rowspans_to_remove = []
for (rs_col_idx, rs_row_start_idx), (rowspan_left, content) in active_rowspans.items():
if rowspan_left > 0 and rs_col_idx < len(days):
day = days[rs_col_idx]
schedule_grid[day][current_time] = content
filled_columns.add(rs_col_idx)
active_rowspans[(rs_col_idx, rs_row_start_idx)] = (rowspan_left - 1, content)
if rowspan_left - 1 <= 0:
rowspans_to_remove.append((rs_col_idx, rs_row_start_idx))
for key in rowspans_to_remove:
del active_rowspans[key]
def process_cell_content(self, cell):
"""Extract and clean cell content"""
content = ' '.join(cell.xpath('.//text()').getall()).strip()
return 'kosong' if not content or content == '---' else content
def update_schedule_grid(self, schedule_grid, days, current_time, col_idx, colspan, content):
"""Update schedule grid with cell content"""
for c in range(colspan):
current_col_idx = col_idx + c
if current_col_idx < len(days):
schedule_grid[days[current_col_idx]][current_time] = content
def update_active_rowspans(self, active_rowspans, row_idx, col_idx, colspan, rowspan, content):
"""Track cells with rowspan for future rows"""
if rowspan > 1:
for c in range(colspan):
active_rowspans[(col_idx + c, row_idx)] = (rowspan - 1, content)
def format_course_entry(self, time_slots, course_info):
"""Format a course entry for optimal RAG retrieval"""
# Parse course information
parts = course_info.split()
course_code = parts[0] if parts and len(parts[0]) == 7 and parts[0][:3].isalpha() and parts[0][3:].isdigit() else ""
course_name = ""
lecturer = ""
room = ""
# Extract course name, lecturer, and room
if "_" in course_info:
# Format: COURSE_CODE Course_Name_P Lecturer Room
course_parts = course_info.split("_P")
if len(course_parts) > 1:
course_name = course_parts[0].replace(course_code, "").strip()
remaining = course_parts[1].strip().split()
lecturer = " ".join(remaining[:-1])
room = remaining[-1] if remaining else ""
else:
# Alternative format
course_name = " ".join(parts[1:-2]) if len(parts) > 3 else course_info.replace(course_code, "").strip()
lecturer = parts[-2] if len(parts) > 1 else ""
room = parts[-1] if parts else ""
# Format time range
time_range = self.format_time_range(time_slots)
# Create structured information
return {
"time_range": time_range,
"course_code": course_code,
"course_name": course_name,
"lecturer": lecturer,
"room": room
}
def write_schedule_to_buffer(self, buffer, schedule_grid, days, time_slots):
for day in days:
current_course = None
current_times = []
day_schedule = []
for time_slot in time_slots:
course = schedule_grid[day][time_slot]
if course == current_course:
current_times.append(time_slot)
else:
if current_course and current_course.lower() != 'kosong':
time_range = self.format_time_range(current_times)
entry = f"- {day} {time_range} | {current_course}"
day_schedule.append(entry)
current_course = course
current_times = [time_slot]
# Tambahkan entri terakhir
if current_course and current_course.lower() != 'kosong':
time_range = self.format_time_range(current_times)
entry = f"- {day} {time_range} | {current_course}"
day_schedule.append(entry)
# Tulis hasil ke buffer
for entry in day_schedule:
buffer.write(entry + "\n")
buffer.write("\n") # spasi antar hari
def format_time_range(self, time_slots):
"""Format multiple time slots into a readable range"""
if len(time_slots) == 1:
return time_slots[0]
first_start = time_slots[0].split('-')[0].strip()
last_end = time_slots[-1].split('-')[-1].strip()
return f"{first_start} - {last_end}"
def extract_jurusan_id(self, link):
match = re.search(r'department\?dep=(\d+)', link)
return match.group(1) if match else f"unknown_{hash(link) % 1000}"
def parse_jurusan(self, response):
jurusan_id = response.meta.get('jurusan_id')
jurusan_name = self.extract_title_jurusan_name(response)
groups_days_horizontal_link = response.xpath('//td/a[contains(@href, "groups_days_horizontal") and not(contains(@href, "subgroups_days_horizontal"))]/@href').get()
if groups_days_horizontal_link:
groups_days_horizontal_url = response.urljoin(groups_days_horizontal_link)
safe_jurusan_name = re.sub(r'[^\w\-_\. ]', '_', jurusan_name)
yield scrapy.Request(groups_days_horizontal_url,
callback=self.parse_jadwal,
meta={'jurusan_id': safe_jurusan_name, 'jurusan_name': jurusan_name})
def parse_jadwal(self, response):
jurusan_id = response.meta.get('jurusan_id')
jurusan_name = response.meta.get('jurusan_name')
if jurusan_id not in self.file_buffers:
self.initialize_document_buffer(jurusan_id, jurusan_name)
output_buffer = self.file_buffers[jurusan_id]
tables = response.xpath('//table[contains(@id, "table_")]') or response.xpath('//table')
for table in tables:
caption_text = self.get_table_caption(table, 0)
class_info = self.clean_class_info(caption_text, table)
if not class_info:
continue
self.write_section_header(output_buffer, class_info)
days = table.xpath('.//thead//th[@class="xAxis"]/text()').getall()
time_slots = table.xpath('.//tbody/tr[not(contains(@class, "foot"))]/th[@class="yAxis"]/text()').getall()
if not days or not time_slots:
continue
schedule_grid = self.build_schedule_grid(days, time_slots)
self.process_table_rows(table, schedule_grid, days, time_slots)
self.write_schedule_to_buffer(output_buffer, schedule_grid, days, time_slots)
def extract_title_jurusan_name(self, response):
title = response.xpath('//title/text()').get()
return title.strip() if title else f"Jurusan_{response.meta.get('jurusan_id')}"
if __name__ == "__main__":
process = CrawlerProcess(settings={
"LOG_LEVEL": "INFO",
"USER_AGENT": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"DOWNLOAD_DELAY": 1,
"AUTOTHROTTLE_ENABLED": True,
})
process.crawl(PnpSpider)
process.start()