Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import tempfile | |
| import logging | |
| from typing import Optional, Dict, Tuple, Any | |
| from pathlib import Path | |
| import gradio as gr | |
| import torch | |
| import whisper | |
| import fitz # PyMuPDF | |
| import docx | |
| from bs4 import BeautifulSoup | |
| import markdown2 | |
| import chardet | |
| from transformers import pipeline, MarianTokenizer, AutoModelForSeq2SeqLM | |
| # ------------------------------- | |
| # Configuration & Logging Setup | |
| # ------------------------------- | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| HF_TOKEN = os.getenv("hffff") | |
| # Language Pair Models | |
| MODELS: Dict[Tuple[str, str], Dict[str, str]] = { | |
| ("English", "Wolof"): {"model_name": "LocaleNLP/localenlp-eng-wol-0.03", "tag": ">>wol<<"}, | |
| ("Wolof", "English"): {"model_name": "LocaleNLP/localenlp-wol-eng-0.03", "tag": ">>eng<<"}, | |
| ("English", "Hausa"): {"model_name": "LocaleNLP/localenlp-eng-hau-0.01", "tag": ">>hau<<"}, | |
| ("Hausa", "English"): {"model_name": "LocaleNLP/localenlp-hau-eng-0.01", "tag": ">>eng<<"}, | |
| ("English", "Darija"): {"model_name": "LocaleNLP/english_darija", "tag": ">>dar<<"} | |
| } | |
| SUPPORTED_LANGUAGES = ["English", "Wolof", "Hausa", "Darija"] | |
| INPUT_MODES = ["Text", "Audio", "File"] | |
| SUPPORTED_FILE_TYPES = [".pdf", ".docx", ".html", ".htm", ".md", ".srt", ".txt"] | |
| # ------------------------------- | |
| # Model Manager | |
| # ------------------------------- | |
| class ModelManager: | |
| """Manages loading and caching of translation and transcription models.""" | |
| def __init__(self): | |
| self.translation_pipeline = None | |
| self.whisper_model = None | |
| def load_translation_model(self, src_lang: str, tgt_lang: str) -> Tuple[Any, str]: | |
| key = (src_lang, tgt_lang) | |
| if key not in MODELS: | |
| raise ValueError(f"Unsupported language pair: {src_lang} -> {tgt_lang}") | |
| config = MODELS[key] | |
| model_name = config["model_name"] | |
| lang_tag = config["tag"] | |
| if self.translation_pipeline is None or self.translation_pipeline.model.config._name_or_path != model_name: | |
| logger.info(f"Loading translation model: {model_name}") | |
| model = AutoModelForSeq2SeqLM.from_pretrained(model_name, token=HF_TOKEN).to(DEVICE) | |
| tokenizer = MarianTokenizer.from_pretrained(model_name, token=HF_TOKEN) | |
| self.translation_pipeline = pipeline( | |
| "translation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| device=0 if DEVICE.type == "cuda" else -1 | |
| ) | |
| return self.translation_pipeline, lang_tag | |
| def load_whisper_model(self) -> Any: | |
| if self.whisper_model is None: | |
| logger.info("Loading Whisper base model...") | |
| self.whisper_model = whisper.load_model("base") | |
| return self.whisper_model | |
| # ------------------------------- | |
| # File Processing Utilities | |
| # ------------------------------- | |
| def extract_text_from_file(file_path: str) -> str: | |
| """Extracts text from various file types.""" | |
| ext = Path(file_path).suffix.lower() | |
| content = Path(file_path).read_bytes() | |
| if ext == ".pdf": | |
| with fitz.open(stream=content, filetype="pdf") as doc: | |
| return "\n".join(page.get_text() for page in doc) | |
| elif ext == ".docx": | |
| doc = docx.Document(file_path) | |
| return "\n".join(p.text for p in doc.paragraphs) | |
| elif ext in (".html", ".htm"): | |
| return BeautifulSoup(content.decode("utf-8", errors="ignore"), "html.parser").get_text() | |
| elif ext == ".md": | |
| html = markdown2.markdown(content.decode("utf-8", errors="ignore")) | |
| return BeautifulSoup(html, "html.parser").get_text() | |
| elif ext == ".srt": | |
| decoded = content.decode("utf-8", errors="ignore") | |
| return re.sub(r"\d+\n\d{2}:\d{2}:\d{2},\d{3} --> .*?\n", "", decoded) | |
| elif ext in (".txt", ".text"): | |
| encoding = chardet.detect(content)["encoding"] | |
| return content.decode(encoding or "utf-8", errors="ignore") | |
| else: | |
| raise ValueError(f"Unsupported file type: {ext}") | |
| # ------------------------------- | |
| # Translation Logic | |
| # ------------------------------- | |
| def translate_text(text: str, src_lang: str, tgt_lang: str, model_manager: ModelManager) -> str: | |
| """Translates input text using the specified language pair.""" | |
| pipe, tag = model_manager.load_translation_model(src_lang, tgt_lang) | |
| paragraphs = text.splitlines() | |
| translated_output = [] | |
| with torch.no_grad(): | |
| for para in paragraphs: | |
| if not para.strip(): | |
| translated_output.append("") | |
| continue | |
| sentences = [s.strip() for s in para.split(". ") if s.strip()] | |
| formatted = [f"{tag} {sentence}" for sentence in sentences] | |
| results = pipe( | |
| formatted, | |
| max_length=5000, | |
| num_beams=5, | |
| early_stopping=True, | |
| no_repeat_ngram_size=3, | |
| repetition_penalty=1.5, | |
| length_penalty=1.2 | |
| ) | |
| translated_sentences = [r["translation_text"].capitalize() for r in results] | |
| translated_output.append(". ".join(translated_sentences)) | |
| return "\n".join(translated_output) | |
| # ------------------------------- | |
| # Audio Transcription | |
| # ------------------------------- | |
| def transcribe_audio(file_path: str, model_manager: ModelManager) -> str: | |
| """Transcribes audio file using Whisper.""" | |
| model = model_manager.load_whisper_model() | |
| result = model.transcribe(file_path) | |
| return result["text"] | |
| # ------------------------------- | |
| # Main Processing Function | |
| # ------------------------------- | |
| def process_input( | |
| mode: str, | |
| src_lang: str, | |
| text_input: str, | |
| audio_path: Optional[str], | |
| file_obj: Optional[gr.FileData] | |
| ) -> str: | |
| """Processes input based on selected mode.""" | |
| if mode == "Text": | |
| return text_input | |
| elif mode == "Audio": | |
| if src_lang != "English": | |
| raise ValueError("Audio input must be in English.") | |
| if not audio_path: | |
| raise ValueError("No audio file uploaded.") | |
| return transcribe_audio(audio_path, model_manager) | |
| elif mode == "File": | |
| if not file_obj: | |
| raise ValueError("No file uploaded.") | |
| return extract_text_from_file(file_obj.name) | |
| return "" | |
| # ------------------------------- | |
| # Gradio UI Logic | |
| # ------------------------------- | |
| model_manager = ModelManager() | |
| def update_visibility(mode: str) -> Dict[str, Any]: | |
| """Update visibility of input components based on selected mode.""" | |
| return { | |
| input_text: gr.update(visible=(mode == "Text")), | |
| audio_input: gr.update(visible=(mode == "Audio")), | |
| file_input: gr.update(visible=(mode == "File")), | |
| extracted_text: gr.update(value="", visible=True), | |
| output_text: gr.update(value="") | |
| } | |
| def handle_process( | |
| mode: str, | |
| src_lang: str, | |
| text_input: str, | |
| audio_path: Optional[str], | |
| file_obj: Optional[gr.FileData] | |
| ) -> Tuple[str, str]: | |
| """Handles the initial processing of input.""" | |
| try: | |
| extracted = process_input(mode, src_lang, text_input, audio_path, file_obj) | |
| return extracted, "" | |
| except Exception as e: | |
| logger.error(f"Processing error: {e}") | |
| return "", f"Error: {str(e)}" | |
| def handle_translate(extracted_text: str, src_lang: str, tgt_lang: str) -> str: | |
| """Handles translation of extracted text.""" | |
| if not extracted_text.strip(): | |
| return "No input text to translate." | |
| try: | |
| return translate_text(extracted_text, src_lang, tgt_lang, model_manager) | |
| except Exception as e: | |
| logger.error(f"Translation error: {e}") | |
| return f"Translation error: {str(e)}" | |
| # ------------------------------- | |
| # Gradio Interface | |
| # ------------------------------- | |
| with gr.Blocks(title="LocaleNLP Translator") as demo: | |
| gr.Markdown("## 🌍 LocaleNLP Multi-language Translator") | |
| gr.Markdown("Supports translation between English, Wolof, Hausa, and Darija. Audio input must be in English.") | |
| with gr.Row(): | |
| input_mode = gr.Radio(choices=INPUT_MODES, label="Input Type", value="Text") | |
| input_lang = gr.Dropdown(choices=SUPPORTED_LANGUAGES[:-1], label="Input Language", value="English") | |
| output_lang = gr.Dropdown(choices=SUPPORTED_LANGUAGES, label="Output Language", value="Wolof") | |
| input_text = gr.Textbox(label="Enter Text", lines=10, visible=True) | |
| audio_input = gr.Audio(label="Upload Audio (.wav, .mp3, .m4a)", type="filepath", visible=False) | |
| file_input = gr.File(file_types=SUPPORTED_FILE_TYPES, label="Upload Document", visible=False) | |
| extracted_text = gr.Textbox(label="Extracted / Transcribed Text", lines=10, interactive=False) | |
| translate_button = gr.Button("Translate") | |
| output_text = gr.Textbox(label="Translated Text", lines=10, interactive=False) | |
| input_mode.change(fn=update_visibility, inputs=input_mode, outputs=[input_text, audio_input, file_input, extracted_text, output_text]) | |
| translate_button.click( | |
| fn=handle_process, | |
| inputs=[input_mode, input_lang, input_text, audio_input, file_input], | |
| outputs=[extracted_text, output_text] | |
| ).then( | |
| fn=handle_translate, | |
| inputs=[extracted_text, input_lang, output_lang], | |
| outputs=output_text | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |