import pandas as pd import json import re import hashlib from typing import List, Dict, Tuple from collections import Counter import unicodedata from datetime import datetime class DataQualityManager: """Data Quality Management and Standardization""" def __init__(self): self.quality_report = {} self.cleaned_data = [] def clean_text(self, text: str) -> str: """Clean and normalize Thai text""" if not text or not isinstance(text, str): return "" # Remove HTML tags text = re.sub(r'<[^>]+>', '', text) # Remove excessive whitespace text = re.sub(r'\s+', ' ', text) # Normalize Thai characters text = unicodedata.normalize('NFC', text) # Clean Thai specific issues text = re.sub(r'ๆ+', 'ๆ', text) # Multiple repetition marks text = re.sub(r'[฿๏๎๚๛]', '', text) # Remove special Thai symbols # Remove URLs text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text) # Remove email addresses text = re.sub(r'\S+@\S+', '', text) return text.strip() def detect_duplicates(self, data: List[Dict]) -> Tuple[List[int], Dict]: """Detect duplicate records""" seen_hashes = {} duplicates = [] for i, record in enumerate(data): # Create hash from input content content = str(record.get('prompt', '')) + str(record.get('input', '')) content_hash = hashlib.md5(content.encode()).hexdigest() if content_hash in seen_hashes: duplicates.append(i) else: seen_hashes[content_hash] = i return duplicates, {"total_duplicates": len(duplicates), "unique_records": len(seen_hashes)} def validate_completeness(self, data: List[Dict]) -> Dict: """Check data completeness""" required_fields = ['id', 'prompt', 'generated_text'] incomplete_records = [] for i, record in enumerate(data): missing_fields = [field for field in required_fields if not record.get(field)] if missing_fields: incomplete_records.append({ 'record_id': i, 'missing_fields': missing_fields }) return { "incomplete_records": len(incomplete_records), "details": incomplete_records[:10] # Show first 10 } def analyze_quality_metrics(self, data: List[Dict]) -> Dict: """Analyze various quality metrics""" if not data: return {} # Text length statistics prompt_lengths = [len(str(record.get('prompt', ''))) for record in data] output_lengths = [len(str(record.get('generated_text', ''))) for record in data] # Language detection (simplified for Thai) thai_pattern = re.compile(r'[ก-๏]') thai_records = sum(1 for record in data if thai_pattern.search(str(record.get('generated_text', '')))) # Model distribution model_usage = Counter([record.get('model_used', 'unknown') for record in data]) return { "total_records": len(data), "avg_prompt_length": sum(prompt_lengths) / len(prompt_lengths) if prompt_lengths else 0, "avg_output_length": sum(output_lengths) / len(output_lengths) if output_lengths else 0, "thai_content_ratio": thai_records / len(data) if data else 0, "model_distribution": dict(model_usage), "length_stats": { "min_prompt": min(prompt_lengths) if prompt_lengths else 0, "max_prompt": max(prompt_lengths) if prompt_lengths else 0, "min_output": min(output_lengths) if output_lengths else 0, "max_output": max(output_lengths) if output_lengths else 0 } } def standardize_format(self, data: List[Dict], task_type: str) -> Tuple[List[Dict], Dict]: """Standardize dataset format according to international standards""" standardized_data = [] for i, record in enumerate(data): # Create standardized record std_record = { "id": f"{task_type}_{i+1:06d}", "task_type": task_type, "input": self.clean_text(str(record.get('prompt', ''))), "output": self.clean_text(str(record.get('generated_text', ''))), "metadata": { "model_used": record.get('model_used', 'unknown'), "generation_time": record.get('generation_time'), "language": "th", "domain": self._detect_domain(record), "quality_score": self._calculate_quality_score(record) } } # Add original data if available if record.get('original_data'): std_record["metadata"]["source_data"] = record['original_data'] standardized_data.append(std_record) # Create dataset metadata dataset_metadata = { "dataset_name": f"thai_{task_type}_dataset", "created_at": datetime.now().isoformat(), "version": "1.0.0", "language": "th", "task_type": task_type, "total_samples": len(standardized_data), "license": "CC-BY-4.0", "description": f"High-quality Thai {task_type} dataset generated using multiple language models" } return standardized_data, dataset_metadata def _detect_domain(self, record: Dict) -> str: """Detect domain/topic of the record""" text = str(record.get('prompt', '')) + str(record.get('generated_text', '')) text_lower = text.lower() # Simple domain detection if any(word in text_lower for word in ['สุขภาพ', 'โรค', 'ยา', 'แพทย์']): return "health" elif any(word in text_lower for word in ['การศึกษา', 'โรงเรียน', 'นักเรียน']): return "education" elif any(word in text_lower for word in ['เทคโนโลยี', 'คอมพิวเตอร์', 'โปรแกรม']): return "technology" elif any(word in text_lower for word in ['การเงิน', 'ธนาคาร', 'เงิน']): return "finance" else: return "general" def _calculate_quality_score(self, record: Dict) -> float: """Calculate quality score for a record (0-1)""" score = 1.0 prompt = str(record.get('prompt', '')) output = str(record.get('generated_text', '')) # Penalize very short outputs if len(output) < 10: score -= 0.3 # Penalize repetitive content if len(set(output.split())) / len(output.split()) < 0.7 if output.split() else True: score -= 0.2 # Penalize incomplete responses if output.endswith('...') or len(output) < len(prompt) * 0.5: score -= 0.2 # Bonus for Thai content thai_pattern = re.compile(r'[ก-๏]') if thai_pattern.search(output): score += 0.1 return max(0.0, min(1.0, score)) def create_data_splits(self, data: List[Dict], train_ratio: float = 0.8, val_ratio: float = 0.1, test_ratio: float = 0.1) -> Dict: """Create train/validation/test splits""" import random # Shuffle data shuffled_data = data.copy() random.shuffle(shuffled_data) total = len(shuffled_data) train_end = int(total * train_ratio) val_end = train_end + int(total * val_ratio) return { "train": shuffled_data[:train_end], "validation": shuffled_data[train_end:val_end], "test": shuffled_data[val_end:] } def generate_dataset_card(self, metadata: Dict, quality_metrics: Dict) -> str: """Generate dataset card (README) in markdown format""" card_template = f"""# Thai {metadata['task_type'].title()} Dataset ## Dataset Description This is a high-quality Thai {metadata['task_type']} dataset created using multiple state-of-the-art language models. ## Dataset Information - **Language**: Thai (th) - **Task Type**: {metadata['task_type']} - **Total Samples**: {metadata['total_samples']:,} - **Created**: {metadata['created_at']} - **Version**: {metadata['version']} - **License**: {metadata['license']} ## Quality Metrics - **Average Prompt Length**: {quality_metrics.get('avg_prompt_length', 0):.1f} characters - **Average Output Length**: {quality_metrics.get('avg_output_length', 0):.1f} characters - **Thai Content Ratio**: {quality_metrics.get('thai_content_ratio', 0):.2%} ## Model Distribution {self._format_model_distribution(quality_metrics.get('model_distribution', {}))} ## Data Fields - `id`: Unique identifier for each sample - `task_type`: Type of NLP task - `input`: Input prompt or question - `output`: Generated response or answer - `metadata`: Additional information including model used, quality score, etc. ## Usage ```python from datasets import load_dataset dataset = load_dataset("path/to/dataset") ``` ## License This dataset is released under {metadata['license']} license. ## Citation If you use this dataset in your research, please cite: ```bibtex @dataset{{thai_{metadata['task_type']}_dataset, title={{Thai {metadata['task_type'].title()} Dataset}}, author={{Thai Dataset Generator}}, year={{{datetime.now().year}}}, version={{{metadata['version']}}}, url={{https://github.com/your-repo/thai-dataset}} }} ``` """ return card_template def _format_model_distribution(self, model_dist: Dict) -> str: """Format model distribution for markdown""" if not model_dist: return "No model distribution data available." lines = [] for model, count in model_dist.items(): lines.append(f"- **{model}**: {count:,} samples") return "\n".join(lines) def export_to_huggingface_format(data_splits: Dict, metadata: Dict, output_dir: str): """Export dataset in Hugging Face compatible format""" import os import json # Create output directory os.makedirs(output_dir, exist_ok=True) # Save data splits for split_name, split_data in data_splits.items(): with open(os.path.join(output_dir, f"{split_name}.jsonl"), 'w', encoding='utf-8') as f: for record in split_data: f.write(json.dumps(record, ensure_ascii=False) + '\n') # Save dataset info dataset_info = { "dataset_name": metadata["dataset_name"], "config_name": "default", "version": {"version_str": metadata["version"]}, "description": metadata["description"], "homepage": "", "license": metadata["license"], "features": { "id": {"dtype": "string"}, "task_type": {"dtype": "string"}, "input": {"dtype": "string"}, "output": {"dtype": "string"}, "metadata": {"dtype": "string"} }, "splits": { split_name: {"name": split_name, "num_examples": len(split_data)} for split_name, split_data in data_splits.items() } } with open(os.path.join(output_dir, "dataset_info.json"), 'w', encoding='utf-8') as f: json.dump(dataset_info, f, ensure_ascii=False, indent=2) print(f"Dataset exported to {output_dir}")