from langchain_community.chat_models import ChatOllama from app.document_processor import load_vector_store_from_supabase from app.prompts import sahabat_prompt from app.db import supabase from langchain.chains import ConversationalRetrievalChain from langchain.memory import ConversationBufferMemory from langchain_community.llms import Replicate import os, time, json, random, logging from typing import List, Dict, Any, Tuple from dataclasses import dataclass, asdict import statistics from datetime import datetime from langsmith import traceable from langsmith import Client # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @dataclass class EvaluationResult: question: str answer: str ground_truth: str correctness_score: float correctness_explanation: str relevance_score: float relevance_explanation: str groundedness_score: float groundedness_explanation: str retrieval_score: float retrieval_explanation: str retrieved_docs: List[str] response_time: float timestamp: str class PNPRAGEvaluator: def __init__(self): logger.info("Initializing PNP RAG Evaluator...") # Load vector store self.vector_store = load_vector_store_from_supabase( supabase, "pnp-bot-storage-archive", "vector_store" ) # Initialize Replicate LLM self.llm = Replicate( model="fauziisyrinapridal/sahabat-ai-v1:afb9fa89fe786362f619fd4fef34bd1f7a4a4da23073d8a6fbf54dcbe458f216", model_kwargs={"temperature": 0.1, "top_p": 0.9, "max_new_tokens": 10000}, replicate_api_token=os.getenv("REPLICATE_API_TOKEN") ) # self.llm = ChatOllama( # model="llama3", # temperature=0.1, # max_tokens=10000, # top_p=0.9 # ) # Initialize memory self.memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True, output_key='answer' ) # Initialize RAG chain self.chain = ConversationalRetrievalChain.from_llm( self.llm, retriever=self.vector_store.as_retriever(search_kwargs={"k": 6}), combine_docs_chain_kwargs={"prompt": sahabat_prompt}, return_source_documents=True, memory=self.memory ) # Initialize evaluator LLM self.ollama_eval = ChatOllama(model="llama3", temperature=0.1) logger.info("PNP RAG Evaluator initialized successfully!") def ask(self, question: str) -> Dict[str, Any]: """Generate answer using RAG chain with error handling""" start_time = time.time() try: result = self.chain({"question": question}) result['response_time'] = time.time() - start_time logger.debug(f"RAG response generated in {result['response_time']:.2f}s") return result except Exception as e: logger.error(f"Error in RAG generation for question '{question[:50]}...': {e}") return { 'answer': f"Error generating response: {str(e)}", 'source_documents': [], 'response_time': time.time() - start_time } def evaluate_score(self, prompt: str, metric_name: str, max_retries: int = 3) -> Tuple[float, str]: """Evaluate with enhanced error handling and logging""" for attempt in range(max_retries): try: logger.debug(f"Evaluating {metric_name} (attempt {attempt + 1})") response = self.ollama_eval.invoke(prompt) content = response.content if hasattr(response, 'content') else str(response) # Enhanced parsing with multiple strategies score, explanation = self._parse_evaluation_response(content) if score is not None: logger.debug(f"{metric_name} evaluation successful: {score:.3f}") return score, explanation logger.warning(f"Could not parse {metric_name} evaluation (attempt {attempt + 1})") except Exception as e: logger.warning(f"{metric_name} evaluation attempt {attempt + 1} failed: {e}") if attempt == max_retries - 1: return 0.0, f"Evaluation failed after {max_retries} attempts: {str(e)}" time.sleep(1) return 0.0, "Maximum retries exceeded" def _parse_evaluation_response(self, content: str) -> Tuple[float, str]: """Enhanced parsing for evaluation responses""" try: # Strategy 1: Standard format if "Skor:" in content and "Penjelasan:" in content: score_section = content.split("Skor:")[1].split("Penjelasan:")[0].strip() explanation_section = content.split("Penjelasan:")[1].strip() # Extract numeric score import re score_match = re.search(r'(\d+\.?\d*)', score_section) if score_match: score = float(score_match.group(1)) # Handle scores > 1 (convert from 0-10 to 0-1 scale if needed) if score > 1: score = score / 10.0 return min(max(score, 0), 1), explanation_section # Strategy 2: Look for score in any line lines = content.split('\n') for line in lines: if any(keyword in line.lower() for keyword in ['skor', 'score']): import re score_match = re.search(r'(\d+\.?\d*)', line) if score_match: score = float(score_match.group(1)) if score > 1: score = score / 10.0 return min(max(score, 0), 1), content # Strategy 3: Look for decimal numbers in content import re numbers = re.findall(r'\b(0\.\d+|1\.0|1)\b', content) if numbers: score = float(numbers[0]) return min(max(score, 0), 1), content return None, content except Exception as e: logger.error(f"Error parsing evaluation response: {e}") return None, f"Parsing error: {str(e)}" def create_evaluation_prompts(self, question: str, answer: str, ground_truth: str, docs_text: str) -> Dict[str, str]: """Create Indonesian evaluation prompts optimized for PNP context""" return { "correctness": f"""Evaluasi KEBENARAN jawaban tentang Politeknik Negeri Padang: PERTANYAAN: {question} JAWABAN REFERENSI (BENAR): {ground_truth} JAWABAN YANG DIEVALUASI: {answer} Berikan skor antara 0.00 (sepenuhnya salah) hingga 1.00 (sepenuhnya benar). Pertimbangkan: - Apakah fakta dalam jawaban sesuai dengan referensi? - Apakah informasi yang diberikan akurat tentang PNP? - Apakah ada kesalahan atau ketidaksesuaian data? Format wajib: Skor: Penjelasan: """, "relevance": f"""Evaluasi RELEVANSI jawaban terhadap pertanyaan tentang PNP: PERTANYAAN: {question} JAWABAN: {answer} Berikan skor antara 0.00 (tidak relevan sama sekali) hingga 1.00 (sangat relevan dan tepat sasaran). Pertimbangkan: - Apakah jawaban menjawab langsung pertanyaan yang diajukan? - Apakah informasi yang diberikan sesuai dengan konteks PNP? - Apakah jawaban fokus dan tidak keluar topik? Format wajib: Skor: Penjelasan: """, "groundedness": f"""Evaluasi apakah jawaban BERDASAR pada dokumen yang tersedia: DOKUMEN/KONTEKS: {docs_text} JAWABAN: {answer} Berikan skor antara 0.00 (tidak berdasar pada dokumen) hingga 1.00 (sepenuhnya berdasar pada dokumen). Pertimbangkan: - Apakah klaim dalam jawaban didukung oleh dokumen? - Apakah ada informasi yang dibuat-buat atau tidak ada di dokumen? - Apakah jawaban konsisten dengan fakta dalam dokumen? Format wajib: Skor: Penjelasan: """, "retrieval": f"""Evaluasi KUALITAS PENGAMBILAN dokumen untuk menjawab pertanyaan: PERTANYAAN: {question} DOKUMEN YANG DIAMBIL: {docs_text} Berikan skor antara 0.00 (dokumen tidak relevan) hingga 1.00 (dokumen sangat relevan dan lengkap). Pertimbangkan: - Apakah dokumen mengandung informasi yang dibutuhkan untuk menjawab pertanyaan? - Apakah ada informasi penting yang tidak terambil? - Apakah dokumen yang diambil sesuai dengan topik pertanyaan? Format wajib: Skor: Penjelasan: """ } def evaluate_single_item(self, item: Dict[str, Any], index: int, total: int) -> EvaluationResult: """Evaluate a single question-answer pair with progress tracking""" question = item["question"] ground_truth = item["ground_truth"] logger.info(f"[{index}/{total}] Evaluating: {question[:60]}...") # Generate RAG response rag_output = self.ask(question) answer = rag_output['answer'] docs = rag_output.get('source_documents', []) response_time = rag_output.get('response_time', 0) # Prepare documents text docs_text = "\n\n".join([ f"Dokumen {i+1}:\n{doc.page_content[:500]}{'...' if len(doc.page_content) > 500 else ''}" for i, doc in enumerate(docs) ]) # Create evaluation prompts prompts = self.create_evaluation_prompts(question, answer, ground_truth, docs_text) # Run evaluations with progress logging logger.debug(f"Evaluating correctness...") correctness_score, correctness_exp = self.evaluate_score(prompts["correctness"], "correctness") logger.debug(f"Evaluating relevance...") relevance_score, relevance_exp = self.evaluate_score(prompts["relevance"], "relevance") logger.debug(f"Evaluating groundedness...") groundedness_score, groundedness_exp = self.evaluate_score(prompts["groundedness"], "groundedness") logger.debug(f"Evaluating retrieval...") retrieval_score, retrieval_exp = self.evaluate_score(prompts["retrieval"], "retrieval") result = EvaluationResult( question=question, answer=answer, ground_truth=ground_truth, correctness_score=correctness_score, correctness_explanation=correctness_exp, relevance_score=relevance_score, relevance_explanation=relevance_exp, groundedness_score=groundedness_score, groundedness_explanation=groundedness_exp, retrieval_score=retrieval_score, retrieval_explanation=retrieval_exp, retrieved_docs=[doc.page_content for doc in docs], response_time=response_time, timestamp=datetime.now().isoformat() ) logger.info(f"[{index}/{total}] Scores - C:{correctness_score:.3f} R:{relevance_score:.3f} G:{groundedness_score:.3f} Ret:{retrieval_score:.3f}") return result @traceable(name="Evaluasi RAG Sahabat-AI",) def run_evaluation(self, dataset: List[Dict[str, Any]], save_path: str = "pnp_evaluation_results.json") -> Dict[str, Any]: """Run evaluation on PNP dataset""" results = [] total_items = len(dataset) start_time = time.time() logger.info(f"Starting PNP RAG evaluation of {total_items} items...") logger.info(f"Results will be saved to: {save_path}") for i, item in enumerate(dataset, 1): try: result = self.evaluate_single_item(item, i, total_items) results.append(result) # Save intermediate results every 5 items if i % 5 == 0: self._save_intermediate_results(results, save_path, i, total_items) # Add delay to avoid rate limiting delay = random.uniform(2, 4) logger.debug(f"Waiting {delay:.1f}s before next evaluation...") time.sleep(delay) except Exception as e: logger.error(f"Error evaluating item {i}: {e}") continue # Calculate final statistics summary = self.calculate_summary_stats(results) summary['evaluation_time'] = time.time() - start_time summary['timestamp'] = datetime.now().isoformat() # Save final results self.save_results(results, summary, save_path) logger.info(f"PNP RAG evaluation completed in {summary['evaluation_time']:.1f}s!") self._print_summary(summary) return {"results": results, "summary": summary} def _save_intermediate_results(self, results: List[EvaluationResult], save_path: str, current: int, total: int): """Save intermediate results during evaluation""" intermediate_path = save_path.replace('.json', f'_intermediate_{current}of{total}.json') partial_summary = self.calculate_summary_stats(results) self.save_results(results, partial_summary, intermediate_path) logger.info(f"Intermediate results saved ({current}/{total}): {intermediate_path}") def calculate_summary_stats(self, results: List[EvaluationResult]) -> Dict[str, Any]: """Calculate comprehensive summary statistics""" if not results: return {"error": "No results to analyze"} def safe_stats(scores): if not scores: return {"mean": 0, "median": 0, "std": 0, "min": 0, "max": 0} return { "mean": statistics.mean(scores), "median": statistics.median(scores), "std": statistics.stdev(scores) if len(scores) > 1 else 0, "min": min(scores), "max": max(scores) } correctness_scores = [r.correctness_score for r in results] relevance_scores = [r.relevance_score for r in results] groundedness_scores = [r.groundedness_score for r in results] retrieval_scores = [r.retrieval_score for r in results] response_times = [r.response_time for r in results] # Overall performance score (weighted average) overall_scores = [ (r.correctness_score * 0.3 + r.relevance_score * 0.25 + r.groundedness_score * 0.25 + r.retrieval_score * 0.2) for r in results ] return { "total_evaluations": len(results), "correctness": safe_stats(correctness_scores), "relevance": safe_stats(relevance_scores), "groundedness": safe_stats(groundedness_scores), "retrieval": safe_stats(retrieval_scores), "overall_performance": safe_stats(overall_scores), "performance_metrics": { "avg_response_time": statistics.mean(response_times), "median_response_time": statistics.median(response_times), "total_response_time": sum(response_times), "fastest_response": min(response_times), "slowest_response": max(response_times) }, "score_distribution": { "excellent_count": len([s for s in overall_scores if s >= 0.8]), "good_count": len([s for s in overall_scores if 0.6 <= s < 0.8]), "fair_count": len([s for s in overall_scores if 0.4 <= s < 0.6]), "poor_count": len([s for s in overall_scores if s < 0.4]) } } def save_results(self, results: List[EvaluationResult], summary: Dict[str, Any], save_path: str): """Save evaluation results with enhanced formatting""" output = { "metadata": { "evaluation_system": "PNP RAG Evaluator", "dataset": "Politeknik Negeri Padang Q&A", "model": "fauziisyrinapridal/sahabat-ai-v1:latest", "evaluator": "llama3", "timestamp": datetime.now().isoformat() }, "summary": summary, "detailed_results": [ { "id": i + 1, "question": r.question, "answer": r.answer, "ground_truth": r.ground_truth, "scores": { "correctness": round(r.correctness_score, 3), "relevance": round(r.relevance_score, 3), "groundedness": round(r.groundedness_score, 3), "retrieval": round(r.retrieval_score, 3), "overall": round((r.correctness_score * 0.3 + r.relevance_score * 0.25 + r.groundedness_score * 0.25 + r.retrieval_score * 0.2), 3) }, "explanations": { "correctness": r.correctness_explanation, "relevance": r.relevance_explanation, "groundedness": r.groundedness_explanation, "retrieval": r.retrieval_explanation }, "metrics": { "response_time": round(r.response_time, 2), "retrieved_docs_count": len(r.retrieved_docs), "timestamp": r.timestamp } } for i, r in enumerate(results) ] } with open(save_path, 'w', encoding='utf-8') as f: json.dump(output, f, ensure_ascii=False, indent=2) logger.info(f"Results saved to {save_path}") def _print_summary(self, summary: Dict[str, Any]): """Print formatted evaluation summary""" print("\n" + "="*60) print(" PNP RAG EVALUATION SUMMARY") print("="*60) print(f"Total Evaluations: {summary['total_evaluations']}") print(f"Evaluation Time: {summary.get('evaluation_time', 0):.1f}s") print("\nSCORE BREAKDOWN:") print(f" Correctness: {summary['correctness']['mean']:.3f} ± {summary['correctness']['std']:.3f}") print(f" Relevance: {summary['relevance']['mean']:.3f} ± {summary['relevance']['std']:.3f}") print(f" Groundedness: {summary['groundedness']['mean']:.3f} ± {summary['groundedness']['std']:.3f}") print(f" Retrieval: {summary['retrieval']['mean']:.3f} ± {summary['retrieval']['std']:.3f}") print(f" Overall: {summary['overall_performance']['mean']:.3f} ± {summary['overall_performance']['std']:.3f}") print("\nPERFORMANCE DISTRIBUTION:") dist = summary['score_distribution'] print(f" Excellent (≥0.8): {dist['excellent_count']} ({dist['excellent_count']/summary['total_evaluations']*100:.1f}%)") print(f" Good (0.6-0.8): {dist['good_count']} ({dist['good_count']/summary['total_evaluations']*100:.1f}%)") print(f" Fair (0.4-0.6): {dist['fair_count']} ({dist['fair_count']/summary['total_evaluations']*100:.1f}%)") print(f" Poor (<0.4): {dist['poor_count']} ({dist['poor_count']/summary['total_evaluations']*100:.1f}%)") print(f"\nRESPONSE TIME STATS:") perf = summary['performance_metrics'] print(f" Average: {perf['avg_response_time']:.2f}s") print(f" Median: {perf['median_response_time']:.2f}s") print(f" Range: {perf['fastest_response']:.2f}s - {perf['slowest_response']:.2f}s") print("="*60) # PNP Evaluation Dataset evaluation_dataset = [ # { # "question": "How is the education system implemented at Politeknik Negeri Padang?", # "ground_truth": "Sistem pendidikan yang diterapkan di Politeknik adalah dengan menggabungkan pendidikan teoritis, praktek (terapan) di Laboratorium dan praktek industry. Pelaksanaan praktik di industri dilakukan oleh mahasiswa selama satu semester untuk menambah wawasan, pengalaman dan pengembangan ilmu guna membentuk tenaga ahli yang terampil dan profesional." # }, # { # "question": "What are the courses included in the curriculum of Politeknik Negeri Padang?", # "ground_truth": "Kurikulum Pendidikan telah disusun berbasis kompetensi dengan kelompok mata kuliah sebagai berikut : - Mata Kuliah Pengembangan Kepribadian (MPK) - Mata Kuliah Keimuan dan Keterampilan (MKK) - Mata Kuliah Berkarya (MKB) - Mata Kuliah Berkehidupan Bermasyarakat (MBB)" # }, # { # "question": "How does Politeknik Negeri Padang support the tridharma mission of higher education?", # "ground_truth": "Politeknik Negeri Padang dalam menjalankan misi tridharma perguruan tinggi didukung oleh tenaga pendidik dan tenaga kependidikan yang profesional pada bidangnya. Jumlah dan kualifikasi staf tersebut berdasarkan keadaan Desember 2017 sebagai berikut : - Tenaga Pendidik : S1 = 14 orang, S2 = 256 orang, S3 = 21 orang (Yang sedang menempuh S3 = 7 orang, Yang sedang menempuh S2 = 5 orang) - Tenaga Kependidikan : SD = 5 orang, SMP = 4 orang, SLTA = 71 orang, D3 = 25 orang, S1 = 54 orang, S2 = 15 orang." # }, # { # "question": "How does Politeknik Negeri Padang provide internet access for students?", # "ground_truth": "Politeknik Negeri Padang telah memiliki Anjungan Internet Mandiri (AIM) yang dapat diakses oleh mahasiswa secara gratis, yang tersedia pada titik-titik strategis. Juga tersedia kawasan hot spot area di sekitar kampus sehingga mahasiswa dapat memanfaatkan internet dengan bebas menggunakan laptop/PC." # }, # { # "question": "What are some examples of Politeknik Negeri Padang's cooperation with industry?", # "ground_truth": "PT. Siemens Indonesia, PT. Toyota Aichi Takaoua Japan, PT. PLN, PT. INTI, Futaba Rashi Siisha Kusho Japan, PT. Sintom, PT. Krakatau Steel, Komatssu Shinge Koumuten, PT. PAL Indonesia, PT. Hexindo, Taishurin Co. Ltd Fukuoaka Japan, PT. Texmaco Perkasa, PT. LEN Industri, PT. Toyota Astra Motor, PT. Indah Kiat, PT. Trakindo Utama, BTN." # }, { "question": "How does Politeknik Negeri Padang assist students in terms of achievement and economics?", "ground_truth": "Tersedia bantuan untuk sekitar 800 mahasiswa setiap tahunnya. Beasiswa yang diterima antara lain: - Beasiswa Peningkatan Prestasi Akademik (PPA), - Beasiswa Kerja Mahasiswa (BKM), - Beasiswa Bantuan Belajar Mahasiswa (BBM), - Beasiswa TPSDP, - Beasiswa Kredit Bantuan Belajar Mahasiswa (KBBM), - Beasiswa Depertemen Hankam (ABRI), - Beasiswa PT. Toyota Astra, - Beasiswa ORBIT (ICMI), - Beasiswa Supersemar." }, { "question": "What is the accreditation status of study programs at Politeknik Negeri Padang?", "ground_truth": "Program studi di Politeknik Negeri Padang memiliki status akreditasi yang bervariasi seperti Baik, Baik Sekali, hingga Unggul. Contohnya, Teknik Mesin (D3) terakreditasi Baik Sekali hingga 2029, Teknik Manufaktur (D4) terakreditasi Unggul hingga 2028, dan Teknik Sipil (D3) terakreditasi A hingga 2026. Setiap program memiliki SK dan sertifikat akreditasi resmi." }, { "question": "How is the new student admission process at Politeknik Negeri Padang?", "ground_truth": "Penerimaan mahasiswa baru di Politeknik Negeri Padang dilakukan melalui berbagai jalur seleksi seperti SNBT, SNMPN, dan kelas kerjasama. Tersedia brosur dan informasi detail melalui situs http://penerimaan.pnp.ac.id. Program studi Teknik Alat Berat misalnya memiliki kelas kerjasama dengan PT Trakindo Utama. Jadwal seleksi dan pengumuman dapat diakses secara daring." }, { "question": "What forms of cooperation does Politeknik Negeri Padang conduct?", "ground_truth": "Politeknik Negeri Padang menjalin kerjasama dengan industri, pemerintah, BUMN, dan asosiasi profesi baik dalam negeri maupun luar negeri. Bentuk kerjasama mencakup rekrutmen, prakerin (praktik kerja industri), kunjungan industri, bimbingan karir, serta pembuatan MoU. Tujuannya untuk menjaga mutu lulusan dan penyaluran SDM." }, { "question": "Who is the current director of Politeknik Negeri Padang?", "ground_truth": "Direktur Politeknik Negeri Padang adalah Dr. Ir. Surfa Yondri, S.T., S.ST., M.Kom. Wakil Direktur Bidang Akademik adalah Ir. Revalin Herdianto, ST., M.Sc., Ph.D. Pimpinan lainnya antara lain Nasrullah, ST., M.T., dan Sarmiadi, S.E., M.M. yang memiliki pengalaman panjang dalam jabatan struktural di kampus." }, { "question": "What is the brief history of the establishment of Politeknik Negeri Padang?", "ground_truth": "Politeknik Negeri Padang didirikan pada tahun 1987 sebagai salah satu dari 17 politeknik pertama di Indonesia. Awalnya bernama Politeknik Engineering Universitas Andalas. Pada 1997 menjadi Politeknik Universitas Andalas lalu berubah menjadi Politeknik Negeri Padang. Saat ini memiliki 32 program studi dari jenjang D3 hingga Magister Terapan." } ] def main(): """Main execution function""" try: client = Client() # Inisialisasi client LangSmith print(f"LangSmith Project: {os.getenv('LANGCHAIN_PROJECT')}") # Initialize evaluator evaluator = PNPRAGEvaluator() # Run evaluation results = evaluator.run_evaluation( evaluation_dataset, "pnp_rag_evaluation_results.json" ) print(f"\nEvaluation completed successfully!") print(f"Results saved to: pnp_rag_evaluation_results.json") return results except Exception as e: logger.error(f"Evaluation failed: {e}") raise if __name__ == "__main__": main()