Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Validation script to compare two CSV files | |
Compares the following columns: ๅบๅ_็ง็ฎ, ๅบๅ_ไธญ็ง็ฎ, ๅบๅ_ๆจๆบๅ็งฐ, ๅบๅ_้ ็ฎๅ, ๅบๅ_ๆจๆบๅไฝ | |
""" | |
import pandas as pd | |
import numpy as np | |
from typing import List, Dict, Tuple, Optional, Any | |
import os | |
from datetime import datetime | |
class FileComparator: | |
def __init__(self, original_file_path: str): | |
""" | |
Initialize comparator with original output file | |
Args: | |
original_file_path: Path to original CSV file | |
""" | |
self.original_file_path = original_file_path | |
self.comparison_columns = [ | |
'ๅบๅ_็ง็ฎ', | |
'ๅบๅ_ไธญ็ง็ฎ', | |
'ๅบๅ_ๆจๆบๅ็งฐ', | |
'ๅบๅ_้ ็ฎๅ', | |
'ๅบๅ_้่จ็จๅไฝ' | |
] | |
def load_original_data(self) -> pd.DataFrame: | |
"""Load original output data""" | |
try: | |
df_original = pd.read_csv(self.original_file_path) | |
print(f"โ Loaded original data: {len(df_original)} rows") | |
return df_original | |
except Exception as e: | |
print(f"โ Error loading original data: {e}") | |
raise | |
def compare_dataframes( | |
self, df_original: pd.DataFrame, df_optimized: pd.DataFrame | |
) -> Dict[str, Any]: | |
""" | |
Compare original vs optimized dataframes | |
Returns: | |
Dict with comparison results | |
""" | |
results: Dict[str, Any] = { | |
"total_rows": len(df_original), | |
"columns_compared": self.comparison_columns, | |
"differences": {}, | |
"summary": {}, | |
} | |
# Check if dataframes have same length | |
if len(df_original) != len(df_optimized): | |
results['length_mismatch'] = { | |
'original': len(df_original), | |
'optimized': len(df_optimized) | |
} | |
print(f"โ Warning: Different number of rows - Original: {len(df_original)}, Optimized: {len(df_optimized)}") | |
# Compare each column | |
for col in self.comparison_columns: | |
if col not in df_original.columns: | |
results['differences'][col] = f"Column not found in original data" | |
continue | |
if col not in df_optimized.columns: | |
results['differences'][col] = f"Column not found in optimized data" | |
continue | |
# Fill NaN values with empty string for comparison | |
original_values = df_original[col].fillna('') | |
optimized_values = df_optimized[col].fillna('') | |
# Compare values | |
differences = original_values != optimized_values | |
diff_count = differences.sum() | |
results['differences'][col] = { | |
'total_differences': int(diff_count), | |
'accuracy_percentage': round((1 - diff_count / len(df_original)) * 100, 2), | |
'different_indices': differences[differences].index.tolist()[:10] # Show first 10 different indices | |
} | |
if diff_count > 0: | |
print(f"โ {col}: {diff_count} differences ({results['differences'][col]['accuracy_percentage']}% accuracy)") | |
else: | |
print(f"โ {col}: Perfect match (100% accuracy)") | |
# Overall summary | |
total_differences = sum([results['differences'][col]['total_differences'] | |
for col in self.comparison_columns | |
if isinstance(results['differences'][col], dict)]) | |
overall_accuracy = round((1 - total_differences / (len(df_original) * len(self.comparison_columns))) * 100, 2) | |
results['summary'] = { | |
'total_differences': total_differences, | |
'overall_accuracy': overall_accuracy, | |
'perfect_match': total_differences == 0 | |
} | |
return results | |
def generate_difference_report( | |
self, | |
df_original: pd.DataFrame, | |
df_optimized: pd.DataFrame, | |
output_file: Optional[str] = None, | |
) -> str: | |
""" | |
Generate detailed difference report | |
Args: | |
df_original: Original dataframe | |
df_optimized: Optimized dataframe | |
output_file: Optional output file path | |
Returns: | |
Report string | |
""" | |
report_lines = [] | |
report_lines.append("=" * 80) | |
report_lines.append(f"FILE COMPARISON REPORT") | |
report_lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
report_lines.append("=" * 80) | |
# Basic info | |
report_lines.append(f"Original data rows: {len(df_original)}") | |
report_lines.append(f"Compared data rows: {len(df_optimized)}") | |
report_lines.append(f"Columns compared: {', '.join(self.comparison_columns)}") | |
report_lines.append("") | |
# Compare each column | |
for col in self.comparison_columns: | |
if col not in df_original.columns or col not in df_optimized.columns: | |
report_lines.append(f"โ {col}: Column missing") | |
continue | |
original_values = df_original[col].fillna('') | |
optimized_values = df_optimized[col].fillna('') | |
differences = original_values != optimized_values | |
diff_count = differences.sum() | |
accuracy = round((1 - diff_count / len(df_original)) * 100, 2) | |
status = "โ " if diff_count == 0 else "โ ๏ธ" | |
report_lines.append(f"{status} {col}: {diff_count} differences ({accuracy}% accuracy)") | |
if diff_count > 0: | |
# Show some examples of differences | |
diff_indices = differences[differences].index[:5] | |
report_lines.append(f" Sample differences (first 5):") | |
for idx in diff_indices: | |
orig_val = str(original_values.iloc[idx])[:50] | |
opt_val = str(optimized_values.iloc[idx])[:50] | |
report_lines.append(f" Row {idx}: '{orig_val}' โ '{opt_val}'") | |
report_lines.append("") | |
# Overall summary | |
total_comparisons = len(df_original) * len(self.comparison_columns) | |
total_differences = sum([ | |
(df_original[col].fillna('') != df_optimized[col].fillna('')).sum() | |
for col in self.comparison_columns | |
if col in df_original.columns and col in df_optimized.columns | |
]) | |
overall_accuracy = round((1 - total_differences / total_comparisons) * 100, 2) | |
report_lines.append("=" * 80) | |
report_lines.append(f"OVERALL RESULTS:") | |
report_lines.append(f"Total differences: {total_differences}") | |
report_lines.append(f"Overall accuracy: {overall_accuracy}%") | |
report_lines.append(f"Perfect match: {'Yes' if total_differences == 0 else 'No'}") | |
report_lines.append("=" * 80) | |
report_text = "\n".join(report_lines) | |
if output_file: | |
with open(output_file, 'w', encoding='utf-8') as f: | |
f.write(report_text) | |
print(f"๐ Report saved to: {output_file}") | |
return report_text | |
def compare_two_files( | |
self, second_file_path: str, report_file: Optional[str] = None | |
) -> bool: | |
""" | |
Compare two CSV files directly | |
Args: | |
second_file_path: Path to second CSV file to compare | |
report_file: Optional report file path | |
Returns: | |
True if files match perfectly (100% accuracy) | |
""" | |
print("๐ Starting file comparison...") | |
# Load original data | |
df_original = self.load_original_data() | |
# Load second file | |
try: | |
df_second = pd.read_csv(second_file_path) | |
print(f"โ Loaded second file: {len(df_second)} rows") | |
except Exception as e: | |
print(f"โ Error loading second file: {e}") | |
return False | |
# Compare results | |
print("๐ Comparing results...") | |
results = self.compare_dataframes(df_original, df_second) | |
# Generate report | |
if report_file: | |
self.generate_difference_report(df_original, df_second, report_file) | |
# Print summary | |
print("\n" + "="*50) | |
print("๐ฏ COMPARISON SUMMARY") | |
print("="*50) | |
print(f"Overall accuracy: {results['summary']['overall_accuracy']}%") | |
print(f"Perfect match: {'Yes' if results['summary']['perfect_match'] else 'No'}") | |
print(f"Total differences: {results['summary']['total_differences']}") | |
return results['summary']['perfect_match'] | |
def main(): | |
"""Main function to compare two files""" | |
# File paths | |
original_file = "data/outputData_original.csv" | |
second_file = "data/outputData_api.csv" | |
if not os.path.exists(original_file): | |
print(f"โ Original file not found: {original_file}") | |
print("Please ensure the original file exists") | |
return | |
if not os.path.exists(second_file): | |
print(f"โ Second file not found: {second_file}") | |
print("Please ensure the second file exists") | |
return | |
# Initialize comparator | |
comparator = FileComparator(original_file) | |
# Compare files | |
is_match = comparator.compare_two_files(second_file, "file_comparison_report.txt") | |
if is_match: | |
print("๐ Files MATCH perfectly!") | |
else: | |
print("โ Files have differences. Check the report for details.") | |
if __name__ == "__main__": | |
main() | |