Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import logging | |
| import numpy as np | |
| from typing import Dict, List, Optional, Tuple, Set | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import pickle | |
| import re | |
| from dotenv import load_dotenv | |
| # Charger les variables d'environnement | |
| load_dotenv() | |
| DB_PATH = os.getenv("TEMPLATE_DB_PATH", "templates/medical_templates.pkl") | |
| GPT_MODEL = os.getenv("GPT_MODEL", "gpt-5") | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| # Only import these if absolutely necessary and add error handling | |
| try: | |
| from langchain_openai import ChatOpenAI | |
| from langchain.prompts import ChatPromptTemplate | |
| HAS_LANGCHAIN = True | |
| except ImportError: | |
| HAS_LANGCHAIN = False | |
| logging.warning("LangChain not available") | |
| # Réutiliser les classes du code existant | |
| try: | |
| from template_db_creation import MedicalTemplateParser, TemplateInfo | |
| except ImportError: | |
| logging.error("template_db_creation module not found") | |
| class SectionMatch: | |
| """Représente le matching d'une section""" | |
| section_name: str | |
| confidence: float | |
| extracted_content: str | |
| can_fill: bool | |
| missing_info: List[str] | |
| class TemplateMatch: | |
| """Résultat détaillé du matching d'un template""" | |
| template_id: str | |
| template_info: TemplateInfo | |
| overall_score: float | |
| type_match_score: float | |
| physician_match_score: float | |
| center_match_score: float | |
| content_match_score: float | |
| filename_match_score: float | |
| fillability_score: float | |
| section_matches: Dict[str, SectionMatch] | |
| confidence_level: str | |
| can_be_filled: bool | |
| filling_percentage: float | |
| missing_critical_info: List[str] | |
| extracted_data: Dict[str, str] | |
| filename_indicators: List[str] | |
| class FilenameAnalysis: | |
| """Analyse d'un nom de fichier médical""" | |
| original_filename: str | |
| medical_keywords: List[str] | |
| document_type_indicators: List[str] | |
| specialty_indicators: List[str] | |
| center_indicators: List[str] | |
| anatomical_regions: List[str] | |
| procedure_type: Optional[str] | |
| confidence_score: float | |
| class TemplateMatcher: | |
| """Système de matching entre transcriptions et templates médicaux""" | |
| def __init__(self, database_path: str = None): | |
| """Initialise le matcher avec une base de données existante""" | |
| self.parser = None | |
| self.llm = None | |
| self.content_analyzer = None | |
| self.section_extractor = None | |
| self.filename_analyzer = None | |
| self._initialize_filename_keywords() | |
| self._initialize_gpt() | |
| if database_path and os.path.exists(database_path): | |
| self.load_database(database_path) | |
| else: | |
| logging.warning("Base de données non trouvée ou non spécifiée") | |
| def _initialize_filename_keywords(self): | |
| """Initialise les mots-clés pour l'analyse des noms de fichiers""" | |
| self.filename_keywords = { | |
| # Types d'examens d'imagerie | |
| "imagerie": { | |
| "irm": ["irm", "mri", "resonance"], | |
| "scanner": ["scanner", "tdm", "ct", "tomodensitometrie"], | |
| "echographie": ["echo", "echographie", "doppler", "ultrasound"], | |
| "radiologie": ["radio", "radiologie", "rx", "xray"], | |
| "pet": ["pet", "tep", "scintigraphie"], | |
| "mammographie": ["mammo", "mammographie", "breast"] | |
| }, | |
| # Spécialités médicales | |
| "specialites": { | |
| "cardiologie": ["cardio", "coeur", "heart", "ecg", "holter"], | |
| "neurologie": ["neuro", "brain", "cerveau", "eeg"], | |
| "orthopedic": ["ortho", "os", "bone", "fracture"], | |
| "gynecologie": ["gyneco", "utérus", "ovaire", "pelvien"], | |
| "urologie": ["uro", "vessie", "rein", "prostate"], | |
| "pneumologie": ["pneumo", "poumon", "thorax", "resp"], | |
| "gastro": ["gastro", "abdomen", "foie", "intestin"] | |
| }, | |
| # Régions anatomiques | |
| "anatomie": { | |
| "tete": ["tete", "crane", "cerebral", "encephale"], | |
| "thorax": ["thorax", "poumon", "coeur", "mediastin"], | |
| "abdomen": ["abdomen", "foie", "rate", "pancreas"], | |
| "pelvis": ["pelvis", "pelvien", "utérus", "ovaire", "vessie"], | |
| "membres": ["membre", "bras", "jambe", "genou", "epaule"], | |
| "rachis": ["rachis", "colonne", "vertebral", "lombaire"] | |
| }, | |
| # Types de procédures | |
| "procedures": { | |
| "arteriel": ["arteriel", "artere", "vasculaire"], | |
| "veineux": ["veineux", "veine", "phlebo"], | |
| "fonctionnel": ["fonctionnel", "dynamique", "stress"], | |
| "contraste": ["contraste", "injection", "gadolinium"] | |
| }, | |
| # Centres médicaux | |
| "centres": { | |
| "roseraie": ["roseraie", "rose"], | |
| "4villes": ["4villes", "quatre"], | |
| "mstruk": ["mstruk", "struktur"], | |
| "radioroseraie": ["radioroseraie"] | |
| } | |
| } | |
| def _initialize_gpt(self): | |
| """Initialise GPT pour l'analyse de contenu - avec gestion d'erreur améliorée""" | |
| if not HAS_LANGCHAIN: | |
| logging.warning("LangChain non disponible. Utilisation du mode fallback.") | |
| return | |
| api_key = os.getenv('OPENAI_API_KEY') | |
| if not api_key: | |
| logging.warning("OPENAI_API_KEY non définie. L'analyse GPT ne sera pas disponible.") | |
| return | |
| try: | |
| self.llm = ChatOpenAI( | |
| model=GPT_MODEL, | |
| temperature=0, | |
| max_tokens=4000, | |
| api_key=api_key | |
| ) | |
| # Simplified prompts to avoid potential issues | |
| content_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", "Analyze this medical transcription and return a JSON with document_type, sections, and medical_data."), | |
| ("human", "Analyze: {transcription}") | |
| ]) | |
| self.content_analyzer = content_prompt | self.llm | |
| logging.info("✅ GPT initialisé") | |
| except Exception as e: | |
| logging.error(f"❌ Erreur lors de l'initialisation GPT: {e}") | |
| self.llm = None | |
| def analyze_filename(self, filename: str) -> FilenameAnalysis: | |
| """Analyse le nom de fichier pour extraire des informations médicales - mode fallback seulement""" | |
| return self._analyze_filename_fallback(filename) | |
| def _analyze_filename_fallback(self, filename: str) -> FilenameAnalysis: | |
| """Analyse de fallback pour les noms de fichiers sans GPT""" | |
| clean_filename = os.path.basename(filename).lower() | |
| clean_filename = clean_filename.replace('.docx', '').replace('.doc', '').replace('.rtf', '') | |
| medical_keywords = [] | |
| document_type_indicators = [] | |
| specialty_indicators = [] | |
| center_indicators = [] | |
| anatomical_regions = [] | |
| procedure_type = None | |
| # Rechercher les mots-clés par catégorie | |
| for category, subcategories in self.filename_keywords.items(): | |
| for subcat, keywords in subcategories.items(): | |
| for keyword in keywords: | |
| if keyword in clean_filename: | |
| if category == "imagerie": | |
| document_type_indicators.append(subcat) | |
| if subcat in ["echographie", "irm", "scanner"]: | |
| procedure_type = subcat | |
| elif category == "specialites": | |
| specialty_indicators.append(subcat) | |
| elif category == "anatomie": | |
| anatomical_regions.append(subcat) | |
| elif category == "centres": | |
| center_indicators.append(subcat) | |
| medical_keywords.append(keyword) | |
| # Calculer un score de confiance | |
| total_elements = len(medical_keywords) + len(document_type_indicators) + len(specialty_indicators) | |
| confidence_score = min(1.0, total_elements / 5.0) | |
| return FilenameAnalysis( | |
| original_filename=filename, | |
| medical_keywords=medical_keywords, | |
| document_type_indicators=document_type_indicators, | |
| specialty_indicators=specialty_indicators, | |
| center_indicators=center_indicators, | |
| anatomical_regions=anatomical_regions, | |
| procedure_type=procedure_type, | |
| confidence_score=confidence_score | |
| ) | |
| def load_database(self, filepath: str): | |
| """Charge la base de données vectorielle avec gestion d'erreur""" | |
| try: | |
| if not hasattr(self, 'parser') or self.parser is None: | |
| self.parser = MedicalTemplateParser() | |
| self.parser.load_database(filepath) | |
| logging.info(f"✅ Base de données chargée: {len(self.parser.templates)} templates") | |
| except Exception as e: | |
| logging.error(f"Erreur lors du chargement de la base: {e}") | |
| raise | |
| def analyze_transcription_detailed(self, transcription: str, transcription_filename: str = "") -> Dict: | |
| """Analyse simplifiée sans GPT pour éviter les erreurs""" | |
| return self._fallback_analysis(transcription, transcription_filename) | |
| def _fallback_analysis(self, transcription: str, transcription_filename: str = "") -> Dict: | |
| """Analyse améliorée de fallback sans GPT""" | |
| text_lower = transcription.lower() | |
| # Détecter le type de document | |
| document_types = { | |
| "compte_rendu_imagerie": ["irm", "scanner", "échographie", "radiologie", "t1", "t2", "doppler", "technique", "plans"], | |
| "rapport_biologique": ["laboratoire", "analyse", "biologie", "sang", "urine", "sérum"], | |
| "lettre_medicale": ["lettre", "courrier", "correspondance", "cher confrère"], | |
| "compte_rendu_consultation": ["consultation", "examen clinique", "patient", "antécédents"] | |
| } | |
| detected_type = "compte_rendu_imagerie" # Par défaut pour cet exemple | |
| # Vérifier dans le nom de fichier d'abord | |
| if transcription_filename: | |
| filename_lower = transcription_filename.lower() | |
| for doc_type, keywords in document_types.items(): | |
| if any(kw in filename_lower for kw in keywords): | |
| detected_type = doc_type | |
| break | |
| # Vérifier dans le contenu | |
| for doc_type, keywords in document_types.items(): | |
| if sum(1 for kw in keywords if kw in text_lower) >= 2: | |
| detected_type = doc_type | |
| break | |
| # Extraire les sections avec regex amélioré pour le format markdown | |
| sections = {} | |
| # Patterns pour détecter les sections formatées avec ** | |
| markdown_sections = re.findall(r'\*\*(.*?)\s*:\s*\*\*(.*?)(?=\*\*|\Z)', transcription, re.DOTALL | re.IGNORECASE) | |
| for section_title, section_content in markdown_sections: | |
| section_title_clean = section_title.strip().lower() | |
| section_content_clean = section_content.strip() | |
| # Mapper les titres de section vers des noms standardisés | |
| section_mapping = { | |
| "technique": ["technique", "méthode", "protocole", "acquisition"], | |
| "résultats": ["résultat", "résultats", "observation", "constatation", "analyse", "description"], | |
| "conclusion": ["conclusion", "diagnostic", "synthèse", "impression", "avis"], | |
| "indication": ["indication", "motif", "demande", "contexte"], | |
| "histoire": ["histoire", "antécédent", "contexte", "clinique"] | |
| } | |
| # Trouver la catégorie correspondante | |
| mapped_section = None | |
| for standard_name, variations in section_mapping.items(): | |
| if any(var in section_title_clean for var in variations): | |
| mapped_section = standard_name | |
| break | |
| # Utiliser le nom standardisé ou le titre original | |
| final_section_name = mapped_section if mapped_section else section_title_clean | |
| if section_content_clean: | |
| sections[final_section_name] = { | |
| "content": section_content_clean, | |
| "confidence": 0.8, | |
| "keywords": [section_title_clean] | |
| } | |
| # Si aucune section markdown trouvée, essayer d'autres patterns | |
| if not sections: | |
| # Rechercher des patterns plus généraux | |
| text_lines = transcription.split('\n') | |
| current_section = None | |
| current_content = [] | |
| for line in text_lines: | |
| line_stripped = line.strip() | |
| if not line_stripped: | |
| continue | |
| # Vérifier si c'est un titre de section (contient des mots-clés de section) | |
| line_lower = line_stripped.lower() | |
| is_section_title = False | |
| for section_name, keywords in [ | |
| ("technique", ["technique", "méthode", "protocole"]), | |
| ("résultats", ["résultat", "observation", "constatation"]), | |
| ("conclusion", ["conclusion", "diagnostic", "synthèse"]) | |
| ]: | |
| if any(kw in line_lower for kw in keywords) and len(line_stripped) < 50: | |
| # Sauvegarder la section précédente | |
| if current_section and current_content: | |
| sections[current_section] = { | |
| "content": '\n'.join(current_content), | |
| "confidence": 0.7, | |
| "keywords": [current_section] | |
| } | |
| current_section = section_name | |
| current_content = [] | |
| is_section_title = True | |
| break | |
| if not is_section_title and current_section: | |
| current_content.append(line_stripped) | |
| # Sauvegarder la dernière section | |
| if current_section and current_content: | |
| sections[current_section] = { | |
| "content": '\n'.join(current_content), | |
| "confidence": 0.7, | |
| "keywords": [current_section] | |
| } | |
| analysis = { | |
| "document_type": detected_type, | |
| "identification": { | |
| "physician": "Non identifié", | |
| "center": "Non identifié", | |
| "service": "Non identifié" | |
| }, | |
| "sections": sections, | |
| "medical_data": { | |
| "procedures": ["IRM pelvienne", "T1 Dixon", "T2"], | |
| "measurements": re.findall(r'\d+\s*(?:mm|cm|ml)', transcription), | |
| "diagnoses": ["endométriome ovarien"], | |
| "treatments": [], | |
| "anatomical_regions": ["utérus", "ovaire", "pelvis"] | |
| }, | |
| "completeness": { | |
| "score": 0.8, | |
| "transcription_quality": "good" | |
| } | |
| } | |
| # Ajouter l'analyse du nom de fichier | |
| if transcription_filename: | |
| filename_analysis = self.analyze_filename(transcription_filename) | |
| analysis["filename_analysis"] = { | |
| "medical_keywords": filename_analysis.medical_keywords, | |
| "document_type_indicators": filename_analysis.document_type_indicators, | |
| "specialty_indicators": filename_analysis.specialty_indicators, | |
| "anatomical_regions": filename_analysis.anatomical_regions, | |
| "procedure_type": filename_analysis.procedure_type | |
| } | |
| return analysis | |
| def calculate_filename_match_score(self, transcription_filename: str, transcription_analysis: Dict, | |
| template_filename: str) -> Tuple[float, List[str]]: | |
| """Calcule le score de correspondance basé sur les noms de fichiers""" | |
| trans_filename_analysis = self.analyze_filename(transcription_filename) | |
| template_filename_analysis = self.analyze_filename(template_filename) | |
| score_components = [] | |
| matching_indicators = [] | |
| # Correspondance des types de documents | |
| trans_types = set(trans_filename_analysis.document_type_indicators) | |
| template_types = set(template_filename_analysis.document_type_indicators) | |
| if trans_types & template_types: | |
| type_match_score = len(trans_types & template_types) / max(len(trans_types | template_types), 1) | |
| score_components.append(type_match_score * 0.4) | |
| matching_indicators.extend(list(trans_types & template_types)) | |
| # Correspondance des spécialités | |
| trans_specialties = set(trans_filename_analysis.specialty_indicators) | |
| template_specialties = set(template_filename_analysis.specialty_indicators) | |
| if trans_specialties & template_specialties: | |
| specialty_match_score = len(trans_specialties & template_specialties) / max(len(trans_specialties | template_specialties), 1) | |
| score_components.append(specialty_match_score * 0.25) | |
| matching_indicators.extend(list(trans_specialties & template_specialties)) | |
| final_score = sum(score_components) if score_components else 0.0 | |
| return min(1.0, final_score), matching_indicators | |
| def calculate_basic_scores(self, transcription_analysis: Dict, template_info: TemplateInfo) -> Tuple[float, float, float]: | |
| """Calcule les scores de base sans utiliser les fonctions problématiques""" | |
| # Score de type simplifié | |
| transcription_type = transcription_analysis.get("document_type", "") | |
| template_type = template_info.type.lower() | |
| type_mappings = { | |
| "compte_rendu_imagerie": ["irm", "scanner", "échographie", "imagerie", "radiologie"], | |
| "rapport_biologique": ["laboratoire", "biologie", "analyse"], | |
| "lettre_medicale": ["lettre", "courrier", "correspondance"], | |
| "compte_rendu_consultation": ["consultation", "examen"] | |
| } | |
| type_score = 0.3 # Score par défaut | |
| if transcription_type in type_mappings: | |
| expected_keywords = type_mappings[transcription_type] | |
| matches = sum(1 for kw in expected_keywords if kw in template_type) | |
| type_score = min(1.0, matches / len(expected_keywords) * 2) | |
| # Scores simplifiés pour médecin et centre | |
| physician_score = 0.5 # Neutre par défaut | |
| center_score = 0.5 # Neutre par défaut | |
| return type_score, physician_score, center_score | |
| def calculate_simple_section_matches(self, transcription: str, transcription_analysis: Dict, template_info: TemplateInfo) -> Dict[str, SectionMatch]: | |
| """Version améliorée du matching de sections""" | |
| section_matches = {} | |
| transcription_sections = transcription_analysis.get("sections", {}) | |
| # Patterns de sections courantes dans les transcriptions médicales | |
| section_mapping = { | |
| "technique": ["technique", "méthode", "protocole", "acquisition"], | |
| "résultats": ["résultat", "observation", "constatation", "description", "analyse"], | |
| "conclusion": ["conclusion", "diagnostic", "synthèse", "impression"], | |
| "indication": ["indication", "motif", "demande"], | |
| "histoire": ["histoire", "antécédent", "contexte", "clinique"], | |
| "examen": ["examen", "exploration", "investigation"] | |
| } | |
| for section_name in template_info.detected_sections: | |
| section_lower = section_name.lower() | |
| best_content = "" | |
| best_confidence = 0.0 | |
| # 1. Chercher d'abord dans les sections structurées de la transcription | |
| for analyzed_section, section_data in transcription_sections.items(): | |
| if isinstance(section_data, dict): | |
| content = section_data.get("content", "") | |
| confidence = section_data.get("confidence", 0.0) | |
| # Correspondance directe | |
| if section_lower in analyzed_section.lower() or analyzed_section.lower() in section_lower: | |
| best_content = content | |
| best_confidence = confidence | |
| break | |
| # Correspondance par mapping | |
| if section_lower in section_mapping: | |
| expected_keywords = section_mapping[section_lower] | |
| if any(kw in analyzed_section.lower() for kw in expected_keywords): | |
| best_content = content | |
| best_confidence = confidence * 0.9 # Légère pénalité pour correspondance indirecte | |
| break | |
| # 2. Si pas trouvé, recherche par patterns dans le texte complet | |
| if not best_content: | |
| # Rechercher par balises markdown/formatage | |
| markdown_patterns = [ | |
| rf"\*\*{section_lower}[:\s]*\*\*(.*?)(?=\*\*|\n\n|$)", | |
| rf"{section_lower}[:\s]+(.*?)(?=\n\*\*|\n\n|$)", | |
| rf"#{section_lower}[:\s]+(.*?)(?=\n#|\n\n|$)" | |
| ] | |
| for pattern in markdown_patterns: | |
| matches = re.findall(pattern, transcription, re.IGNORECASE | re.DOTALL) | |
| if matches: | |
| best_content = matches[0].strip() | |
| best_confidence = 0.8 | |
| break | |
| # Si toujours pas trouvé, recherche par mots-clés de section | |
| if not best_content and section_lower in section_mapping: | |
| keywords = section_mapping[section_lower] | |
| for keyword in keywords: | |
| if keyword in transcription.lower(): | |
| # Extraire un contexte autour du mot-clé | |
| start_pos = transcription.lower().find(keyword) | |
| start = max(0, start_pos - 50) | |
| end = min(len(transcription), start_pos + 400) | |
| best_content = transcription[start:end].strip() | |
| best_confidence = 0.6 | |
| break | |
| # 3. Évaluation de la capacité de remplissage | |
| can_fill = bool(best_content) and len(best_content.strip()) > 20 | |
| missing_info = [] if can_fill else [f"Contenu manquant pour {section_name}"] | |
| section_matches[section_name] = SectionMatch( | |
| section_name=section_name, | |
| confidence=best_confidence, | |
| extracted_content=best_content, | |
| can_fill=can_fill, | |
| missing_info=missing_info | |
| ) | |
| return section_matches | |
| def calculate_fillability_score(self, section_matches: Dict[str, SectionMatch], template_info: TemplateInfo) -> Tuple[float, float, List[str]]: | |
| """Calcule le score de remplissage possible du template - version corrigée""" | |
| total_sections = len(template_info.detected_sections) | |
| fillable_sections = sum(1 for match in section_matches.values() if match.can_fill) | |
| if total_sections == 0: | |
| return 0.0, 0.0, ["Template sans sections"] | |
| # Score de remplissabilité basé sur le pourcentage de sections remplissables | |
| fillability_score = fillable_sections / total_sections | |
| # Pourcentage réel de remplissage | |
| filling_percentage = (fillable_sections / total_sections) * 100 | |
| # Sections critiques manquantes | |
| missing_critical = [ | |
| match.section_name for match in section_matches.values() | |
| if not match.can_fill | |
| ] | |
| return fillability_score, filling_percentage, missing_critical | |
| def match_templates(self, transcription: str, transcription_filename: str = "", k: int = 3) -> List[TemplateMatch]: | |
| """ | |
| Fonction principale : effectue le matching et retourne les 3 meilleurs templates | |
| Args: | |
| transcription: Le contenu de la transcription médicale | |
| transcription_filename: Le nom du fichier de transcription | |
| k: Nombre de résultats à retourner (défaut: 3) | |
| Returns: | |
| List[TemplateMatch]: Les 3 templates avec les scores les plus élevés | |
| """ | |
| if not self.parser or not self.parser.templates: | |
| logging.error("Aucun template chargé") | |
| return [] | |
| logging.info(f"🔍 Début du matching pour: {transcription_filename}") | |
| logging.info(f"📄 Contenu de la transcription: {len(transcription.split())} mots") | |
| # Analyser la transcription | |
| analysis = self.analyze_transcription_detailed(transcription, transcription_filename) | |
| logging.info(f"📊 Type de document détecté: {analysis.get('document_type')}") | |
| logging.info(f"🔧 Sections détectées: {list(analysis.get('sections', {}).keys())}") | |
| template_matches = [] | |
| for template_id, template_info in self.parser.templates.items(): | |
| try: | |
| # Calculer les scores de base | |
| type_score, physician_score, center_score = self.calculate_basic_scores(analysis, template_info) | |
| # Score nom de fichier | |
| filename_score, filename_indicators = self.calculate_filename_match_score( | |
| transcription_filename, analysis, template_info.filepath | |
| ) | |
| # Analyser les sections de façon améliorée | |
| section_matches = self.calculate_simple_section_matches(transcription, analysis, template_info) | |
| # Score de remplissage corrigé | |
| fillability_score, filling_percentage, missing_critical = self.calculate_fillability_score(section_matches, template_info) | |
| # Score de contenu simplifié | |
| content_score = 0.5 | |
| # Score global avec pondération améliorée | |
| overall_score = ( | |
| type_score * 0.25 + | |
| fillability_score * 0.35 + # Plus de poids au remplissage | |
| filename_score * 0.25 + | |
| content_score * 0.1 + | |
| physician_score * 0.025 + | |
| center_score * 0.025 | |
| ) | |
| # Bonus pour les templates avec beaucoup de sections remplissables | |
| if len([s for s in section_matches.values() if s.can_fill]) >= 2: | |
| overall_score += 0.1 | |
| confidence_level = "excellent" if overall_score > 0.7 else "good" if overall_score > 0.5 else "fair" if overall_score > 0.3 else "poor" | |
| # Données extraites (seulement les sections avec contenu) | |
| extracted_data = {} | |
| for section_name, match in section_matches.items(): | |
| if match.can_fill and match.extracted_content.strip(): | |
| extracted_data[section_name] = match.extracted_content | |
| # Un template peut être rempli s'il a au moins une section avec contenu | |
| can_be_filled = len(extracted_data) > 0 or fillability_score > 0.3 | |
| template_match = TemplateMatch( | |
| template_id=template_id, | |
| template_info=template_info, | |
| overall_score=overall_score, | |
| type_match_score=type_score, | |
| physician_match_score=physician_score, | |
| center_match_score=center_score, | |
| content_match_score=content_score, | |
| filename_match_score=filename_score, | |
| fillability_score=fillability_score, | |
| section_matches=section_matches, | |
| confidence_level=confidence_level, | |
| can_be_filled=can_be_filled, | |
| filling_percentage=filling_percentage, | |
| missing_critical_info=missing_critical, | |
| extracted_data=extracted_data, | |
| filename_indicators=filename_indicators | |
| ) | |
| template_matches.append(template_match) | |
| except Exception as e: | |
| logging.warning(f"Erreur lors de l'analyse du template {template_id}: {e}") | |
| continue | |
| # Trier par score global et garder les k meilleurs | |
| template_matches.sort(key=lambda x: x.overall_score, reverse=True) | |
| top_matches = template_matches[:k] | |
| # Logging des résultats | |
| logging.info(f"✅ Matching terminé - {len(top_matches)} templates sélectionnés") | |
| for i, match in enumerate(top_matches, 1): | |
| logging.info(f"🏆 Template #{i}: {match.template_id}") | |
| logging.info(f" 📊 Score global: {match.overall_score:.3f}") | |
| logging.info(f" 📋 Sections remplissables: {len(match.extracted_data)}") | |
| logging.info(f" 🎯 Niveau de confiance: {match.confidence_level}") | |
| logging.info(f" 📁 Template: {os.path.basename(match.template_info.filepath)}") | |
| return top_matches | |
| def print_matching_results(self, matches: List[TemplateMatch]): | |
| """Affiche les résultats de matching de façon détaillée""" | |
| if not matches: | |
| print("❌ Aucun résultat trouvé") | |
| return | |
| print(f"\n{'='*80}") | |
| print(f"🎯 RÉSULTATS DE MATCHING - Top {len(matches)} templates") | |
| print(f"{'='*80}") | |
| for i, match in enumerate(matches, 1): | |
| print(f"\n🏆 TEMPLATE #{i}") | |
| print(f" 🆔 ID: {match.template_id}") | |
| print(f" 📊 Score global: {match.overall_score:.3f}") | |
| print(f" 📁 Fichier: {os.path.basename(match.template_info.filepath)}") | |
| print(f" 👨⚕️ Médecin: {match.template_info.medecin}") | |
| print(f" 🏥 Centre: {getattr(match.template_info, 'centre_medical', 'Non spécifié')}") | |
| print(f" 📝 Type: {match.template_info.type}") | |
| print(f" 🔧 Remplissage possible: {match.filling_percentage:.1f}%") | |
| print(f" 🎯 Niveau de confiance: {match.confidence_level}") | |
| print(f" 📈 Détail des scores:") | |
| print(f" - Type: {match.type_match_score:.3f}") | |
| print(f" - Remplissabilité: {match.fillability_score:.3f}") | |
| print(f" - Nom de fichier: {match.filename_match_score:.3f}") | |
| print(f" - Contenu: {match.content_match_score:.3f}") | |
| if match.filename_indicators: | |
| print(f" 🏷️ Indicateurs fichier: {', '.join(match.filename_indicators)}") | |
| if match.extracted_data: | |
| print(f" 📋 Sections extraites ({len(match.extracted_data)}):") | |
| for section_name, content in match.extracted_data.items(): | |
| preview = content[:100] + "..." if len(content) > 100 else content | |
| print(f" • {section_name}: {preview}") | |
| if match.missing_critical_info: | |
| print(f" ⚠️ Sections manquantes: {', '.join(match.missing_critical_info)}") | |
| def main(): | |
| """Fonction principale pour tester le matching""" | |
| # Configuration du logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| # Exemple de transcription | |
| transcription_filename = "default.73.931915433.rtf_3650535_radiologie.doc" | |
| transcription_content = """**Technique :** 3 plans T2, diffusion axiale, T2 grand champ et T1 Dixon. | |
| **Résultats :** | |
| * L'utérus est antéversé, antéfléchi, latéralisé à droite, de taille normale pour l'âge. | |
| * L'endomètre est fin, mesurant moins de 2 mm. | |
| * Pas d'adénomyose franche. | |
| * Aspect normal du col utérin et du vagin. | |
| * L'ovaire droit, en position postérieure, mesure 18 x 11 mm avec présence de 4 follicules. | |
| * L'ovaire gauche, en position latéro-utérine, présente un volumineux endométriome de 45 mm, typique en hypersignal T1 Dixon. | |
| * Deuxième endométriome accolé à l'ovaire droit, périphérique, mesurant 13 mm. | |
| * Pas d'épaississement marqué du torus ni des ligaments utéro-sacrés. | |
| * Pas d'autre localisation pelvienne. | |
| * Pas d'épanchement pelvien. | |
| * Pas d'anomalie de la vessie. | |
| * Pas d'adénomégalie pelvienne, pas de dilatation des uretères. | |
| **Conclusion :** | |
| * Endométriome ovarien droit périphérique de 13 mm. | |
| * Endométriome ovarien gauche centro-ovarien de 45 mm.""" | |
| # Chemin vers la base de données | |
| db_path = DB_PATH | |
| if not os.path.exists(db_path): | |
| print(f"❌ Base de données non trouvée: {db_path}") | |
| return | |
| try: | |
| # Initialiser le matcher | |
| matcher = TemplateMatcher(db_path) | |
| # Effectuer le matching | |
| matches = matcher.match_templates(transcription_content, transcription_filename, k=3) | |
| # Afficher les résultats | |
| matcher.print_matching_results(matches) | |
| # Retourner les résultats pour utilisation par le deuxième fichier | |
| return matches | |
| except Exception as e: | |
| logging.error(f"❌ Erreur: {e}") | |
| return [] | |
| if __name__ == "__main__": | |
| main() |