Spaces:
Runtime error
Runtime error
import gradio as gr | |
import whisper | |
from pyannote.audio import Pipeline | |
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer | |
from bertopic import BERTopic | |
from sklearn.feature_extraction.text import CountVectorizer | |
import pandas as pd | |
import torch | |
# Load Whisper model for transcription | |
whisper_model = whisper.load_model("large-v2") # Use "large-v2" if "large" doesn't work | |
# Load translation pipeline | |
translator = pipeline("translation", model="Helsinki-NLP/opus-mt-ar-en") | |
# Load summarization pipeline | |
summarizer = pipeline("summarization", model="facebook/bart-large-cnn") | |
# Load LLaMA model and tokenizer for chat-based interaction | |
llama_model_name = "meta-llama/Llama-2-7b-chat" | |
tokenizer = AutoTokenizer.from_pretrained(llama_model_name) | |
model = AutoModelForCausalLM.from_pretrained(llama_model_name) | |
# Global variables to store processed data | |
aligned_transcription = [] | |
translated_text = "" | |
topics = [] | |
summary = "" | |
def perform_speaker_diarization(audio_path, hf_token="YOUR_HUGGINGFACE_TOKEN"): | |
# Load the speaker diarization pipeline | |
pipeline = Pipeline.from_pretrained("pyannote/[email protected]", use_auth_token=hf_token) | |
# Apply diarization | |
diarization = pipeline(audio_path) | |
# Extract speaker segments | |
speaker_segments = [] | |
for turn, _, speaker in diarization.itertracks(yield_label=True): | |
speaker_segments.append({ | |
"start": turn.start, | |
"end": turn.end, | |
"speaker": speaker | |
}) | |
return speaker_segments | |
def transcribe_with_speaker_diarization(audio_path, hf_token="YOUR_HUGGINGFACE_TOKEN"): | |
# Step 1: Perform speaker diarization | |
speaker_segments = perform_speaker_diarization(audio_path, hf_token) | |
# Step 2: Transcribe audio | |
transcription = whisper_model.transcribe(audio_path) | |
# Step 3: Align transcription with speaker segments | |
aligned_transcription = [] | |
for segment in transcription["segments"]: | |
start_time = segment["start"] | |
end_time = segment["end"] | |
text = segment["text"] | |
# Find the corresponding speaker | |
speaker = "Unknown" | |
for spk_segment in speaker_segments: | |
if spk_segment["start"] <= start_time <= spk_segment["end"]: | |
speaker = spk_segment["speaker"] | |
break | |
aligned_transcription.append({ | |
"speaker": speaker, | |
"start": start_time, | |
"end": end_time, | |
"text": text | |
}) | |
return aligned_transcription | |
def translate_text(text, src_lang="ar", tgt_lang="en"): | |
translated = translator(text, max_length=400) | |
return translated[0]["translation_text"] | |
def perform_topic_modeling(texts): | |
vectorizer = CountVectorizer(stop_words="english") | |
topic_model = BERTopic(vectorizer_model=vectorizer, calculate_probabilities=True) | |
topics, probs = topic_model.fit_transform(texts) | |
return topic_model.get_topic_info(), topic_model.visualize_topics() | |
def summarize_text(text, max_length=150, min_length=30): | |
summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False) | |
return summary[0]["summary_text"] | |
def generate_response(prompt, max_tokens=150): | |
inputs = tokenizer(prompt, return_tensors="pt") | |
outputs = model.generate(inputs["input_ids"], max_length=max_tokens) | |
response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
return response | |
def process_audio(audio_path, language="auto", hf_token="YOUR_HUGGINGFACE_TOKEN"): | |
global aligned_transcription, translated_text, topics, summary | |
# Step 1: Transcribe audio with speaker diarization | |
aligned_transcription = transcribe_with_speaker_diarization(audio_path, hf_token) | |
# Step 2: Translate text if needed | |
full_text = " ".join([seg["text"] for seg in aligned_transcription]) | |
if language != "en": | |
translated_text = translate_text(full_text, src_lang="ar", tgt_lang="en") | |
else: | |
translated_text = full_text | |
# Step 3: Perform topic modeling | |
topics, _ = perform_topic_modeling([translated_text]) | |
# Step 4: Summarize text | |
summary = summarize_text(translated_text) | |
return "Audio processed successfully!" | |
def answer_question(query): | |
global aligned_transcription, translated_text, topics, summary | |
# Combine context for the LLM | |
context = f""" | |
Transcription: {translated_text} | |
Topics: {topics.to_string(index=False)} | |
Summary: {summary} | |
""" | |
# Generate response using LLM | |
response = generate_response(f"{context}\nQuestion: {query}") | |
return response | |
# Gradio Interface | |
with gr.Blocks() as demo: | |
gr.Markdown("# Advanced Audio Analysis App with Speaker Diarization") | |
audio_input = gr.Audio(label="Upload Audio File") | |
language_input = gr.Dropdown(choices=["auto", "en", "ar"], label="Language", value="auto") | |
hf_token_input = gr.Textbox(label="Hugging Face Token (for pyannote.audio)", type="password") | |
process_button = gr.Button("Process Audio") | |
status_output = gr.Textbox(label="Status") | |
question_input = gr.Textbox(label="Ask a Question") | |
answer_output = gr.Textbox(label="Answer") | |
process_button.click( | |
process_audio, | |
inputs=[audio_input, language_input, hf_token_input], | |
outputs=status_output | |
) | |
question_input.submit(answer_question, inputs=question_input, outputs=answer_output) | |
demo.launch() |