Alimubariz124 commited on
Commit
23e0fa2
·
verified ·
1 Parent(s): 186dc8d

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +112 -124
main.py CHANGED
@@ -1,15 +1,40 @@
 
1
  import whisper
2
-
3
- def transcribe_audio(audio_path):
4
- model = whisper.load_model("base")
5
- result = model.transcribe(audio_path)
6
- return result["text"]
7
  from pyannote.audio import Pipeline
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8
 
9
- def perform_speaker_diarization(audio_path):
10
- pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization@2.1", use_auth_token="YOUR_HUGGINGFACE_TOKEN")
 
 
 
11
  diarization = pipeline(audio_path)
12
 
 
13
  speaker_segments = []
14
  for turn, _, speaker in diarization.itertracks(yield_label=True):
15
  speaker_segments.append({
@@ -19,145 +44,108 @@ def perform_speaker_diarization(audio_path):
19
  })
20
  return speaker_segments
21
 
22
- from textblob import TextBlob
23
- from sklearn.feature_extraction.text import CountVectorizer
24
- from sklearn.decomposition import LatentDirichletAllocation
25
- from collections import Counter
26
- import nltk
27
- from nltk.corpus import stopwords
28
- import spacy
29
-
30
- nltk.download('stopwords')
31
- nltk.download('punkt')
32
-
33
- # Load spaCy model for NER
34
- nlp = spacy.load("en_core_web_sm")
35
-
36
- def analyze_sentiment(text):
37
- blob = TextBlob(text)
38
- return blob.sentiment.polarity, blob.sentiment.subjectivity
39
-
40
- def extract_keywords(text, top_n=5):
41
- stop_words = set(stopwords.words("english"))
42
- words = nltk.word_tokenize(text.lower())
43
- filtered_words = [word for word in words if word.isalnum() and word not in stop_words]
44
- word_counts = Counter(filtered_words)
45
- return word_counts.most_common(top_n)
 
 
 
 
 
46
 
47
- def perform_topic_modeling(text, num_topics=5, num_words=10):
48
- vectorizer = CountVectorizer(stop_words="english", max_features=1000)
49
- X = vectorizer.fit_transform([text])
50
- lda = LatentDirichletAllocation(n_components=num_topics, random_state=42)
51
- lda.fit(X)
52
 
53
- topics = []
54
- for idx, topic in enumerate(lda.components_):
55
- top_words = [vectorizer.get_feature_names_out()[i] for i in topic.argsort()[:-num_words - 1:-1]]
56
- topics.append(f"Topic {idx + 1}: {' '.join(top_words)}")
57
- return topics
58
 
59
- def extract_entities(text):
60
- doc = nlp(text)
61
- entities = [(ent.text, ent.label_) for ent in doc.ents]
62
- return entities
63
 
64
- def parse_query(query):
65
- doc = nlp(query)
66
- keywords = [token.text.lower() for token in doc if token.is_alpha and not token.is_stop]
67
- intent = None
68
-
69
- if any(word in ["how many", "count"] for word in keywords):
70
- intent = "count"
71
- elif any(word in ["list", "show me"] for word in keywords):
72
- intent = "list"
73
- elif any(word in ["sentiment", "polarity", "subjectivity"] for word in keywords):
74
- intent = "sentiment"
75
- elif any(word in ["theme", "topic", "main"] for word in keywords):
76
- intent = "topic"
77
- elif any(word in ["keyword", "common"] for word in keywords):
78
- intent = "keyword"
79
- elif any(word in ["entity", "name", "person", "organization"] for word in keywords):
80
- intent = "ner"
81
- return intent, keywords
82
 
83
- def answer_question(query, qa_df):
84
- intent, keywords = parse_query(query)
85
-
86
- if intent == "count":
87
- filtered = qa_df[qa_df["Transcript"].str.contains("|".join(keywords), case=False)]
88
- return f"{len(filtered)} responses contain the keywords: {', '.join(keywords)}."
89
-
90
- elif intent == "list":
91
- filtered = qa_df[qa_df["Transcript"].str.contains("|".join(keywords), case=False)]["Transcript"].tolist()
92
- return "\n".join(filtered) if filtered else "No matching responses found."
93
 
94
- elif intent == "sentiment":
95
- avg_polarity = qa_df["Sentiment_Polarity"].mean()
96
- avg_subjectivity = qa_df["Sentiment_Subjectivity"].mean()
97
- return f"Average Polarity: {avg_polarity:.2f}, Average Subjectivity: {avg_subjectivity:.2f}"
98
-
99
- elif intent == "topic":
100
- all_text = " ".join(qa_df["Transcript"])
101
- topics = perform_topic_modeling(all_text)
102
- return "\n".join(topics)
103
-
104
- elif intent == "keyword":
105
- all_text = " ".join(qa_df["Transcript"])
106
- keywords = extract_keywords(all_text)
107
- return ", ".join([word for word, count in keywords])
108
-
109
- elif intent == "ner":
110
- all_text = " ".join(qa_df["Transcript"])
111
- entities = extract_entities(all_text)
112
- return "\n".join([f"{entity} ({label})" for entity, label in entities])
113
 
 
 
 
 
114
  else:
115
- return "I'm not sure how to answer that. Try asking about counts, lists, sentiment, topics, keywords, or entities."
116
-
117
- import gradio as gr
118
-
119
- # Global variables to store processed data
120
- qa_df = None
121
-
122
- def process_audio(audio_path):
123
- global qa_df
124
 
125
- # Step 1: Transcribe audio
126
- transcription = transcribe_audio(audio_path)
127
 
128
- # Step 2: Perform speaker diarization
129
- speaker_segments = perform_speaker_diarization(audio_path)
130
 
131
- # Step 3: Analyze text
132
- sentiment_polarity, sentiment_subjectivity = analyze_sentiment(transcription)
133
- topics = perform_topic_modeling(transcription)
134
- keywords = extract_keywords(transcription)
135
- entities = extract_entities(transcription)
136
 
137
- # Create a DataFrame
138
- qa_df = pd.DataFrame({
139
- "Speaker": [seg["speaker"] for seg in speaker_segments],
140
- "Transcript": [transcription],
141
- "Sentiment_Polarity": [sentiment_polarity],
142
- "Sentiment_Subjectivity": [sentiment_subjectivity],
143
- "Topics": [topics],
144
- "Keywords": [keywords],
145
- "Entities": [entities]
146
- })
147
 
148
- return "Audio processed successfully!"
 
 
149
 
150
  # Gradio Interface
151
  with gr.Blocks() as demo:
152
- gr.Markdown("# Advanced Audio Analysis App")
153
  audio_input = gr.Audio(label="Upload Audio File")
 
 
154
  process_button = gr.Button("Process Audio")
155
  status_output = gr.Textbox(label="Status")
156
 
157
  question_input = gr.Textbox(label="Ask a Question")
158
  answer_output = gr.Textbox(label="Answer")
159
 
160
- process_button.click(process_audio, inputs=audio_input, outputs=status_output)
161
- question_input.submit(answer_question, inputs=[question_input], outputs=answer_output)
 
 
 
 
162
 
163
  demo.launch()
 
1
+ import gradio as gr
2
  import whisper
 
 
 
 
 
3
  from pyannote.audio import Pipeline
4
+ from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer
5
+ from bertopic import BERTopic
6
+ from sklearn.feature_extraction.text import CountVectorizer
7
+ import pandas as pd
8
+ import torch
9
+
10
+ # Load Whisper model for transcription
11
+ whisper_model = whisper.load_model("large")
12
+
13
+ # Load translation pipeline
14
+ translator = pipeline("translation", model="Helsinki-NLP/opus-mt-ar-en")
15
+
16
+ # Load summarization pipeline
17
+ summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
18
+
19
+ # Load LLaMA model and tokenizer for chat-based interaction
20
+ llama_model_name = "meta-llama/Llama-2-7b-chat"
21
+ tokenizer = AutoTokenizer.from_pretrained(llama_model_name)
22
+ model = AutoModelForCausalLM.from_pretrained(llama_model_name)
23
+
24
+ # Global variables to store processed data
25
+ aligned_transcription = []
26
+ translated_text = ""
27
+ topics = []
28
+ summary = ""
29
 
30
+ def perform_speaker_diarization(audio_path, hf_token="YOUR_HUGGINGFACE_TOKEN"):
31
+ # Load the speaker diarization pipeline
32
+ pipeline = Pipeline.from_pretrained("pyannote/[email protected]", use_auth_token=hf_token)
33
+
34
+ # Apply diarization
35
  diarization = pipeline(audio_path)
36
 
37
+ # Extract speaker segments
38
  speaker_segments = []
39
  for turn, _, speaker in diarization.itertracks(yield_label=True):
40
  speaker_segments.append({
 
44
  })
45
  return speaker_segments
46
 
47
+ def transcribe_with_speaker_diarization(audio_path, hf_token="YOUR_HUGGINGFACE_TOKEN"):
48
+ # Step 1: Perform speaker diarization
49
+ speaker_segments = perform_speaker_diarization(audio_path, hf_token)
50
+
51
+ # Step 2: Transcribe audio
52
+ transcription = whisper_model.transcribe(audio_path)
53
+
54
+ # Step 3: Align transcription with speaker segments
55
+ aligned_transcription = []
56
+ for segment in transcription["segments"]:
57
+ start_time = segment["start"]
58
+ end_time = segment["end"]
59
+ text = segment["text"]
60
+
61
+ # Find the corresponding speaker
62
+ speaker = "Unknown"
63
+ for spk_segment in speaker_segments:
64
+ if spk_segment["start"] <= start_time <= spk_segment["end"]:
65
+ speaker = spk_segment["speaker"]
66
+ break
67
+
68
+ aligned_transcription.append({
69
+ "speaker": speaker,
70
+ "start": start_time,
71
+ "end": end_time,
72
+ "text": text
73
+ })
74
+
75
+ return aligned_transcription
76
 
77
+ def translate_text(text, src_lang="ar", tgt_lang="en"):
78
+ translated = translator(text, max_length=400)
79
+ return translated[0]["translation_text"]
 
 
80
 
81
+ def perform_topic_modeling(texts):
82
+ vectorizer = CountVectorizer(stop_words="english")
83
+ topic_model = BERTopic(vectorizer_model=vectorizer, calculate_probabilities=True)
84
+ topics, probs = topic_model.fit_transform(texts)
85
+ return topic_model.get_topic_info(), topic_model.visualize_topics()
86
 
87
+ def summarize_text(text, max_length=150, min_length=30):
88
+ summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False)
89
+ return summary[0]["summary_text"]
 
90
 
91
+ def generate_response(prompt, max_tokens=150):
92
+ inputs = tokenizer(prompt, return_tensors="pt")
93
+ outputs = model.generate(inputs["input_ids"], max_length=max_tokens)
94
+ response = tokenizer.decode(outputs[0], skip_special_tokens=True)
95
+ return response
 
 
 
 
 
 
 
 
 
 
 
 
 
96
 
97
+ def process_audio(audio_path, language="auto", hf_token="YOUR_HUGGINGFACE_TOKEN"):
98
+ global aligned_transcription, translated_text, topics, summary
 
 
 
 
 
 
 
 
99
 
100
+ # Step 1: Transcribe audio with speaker diarization
101
+ aligned_transcription = transcribe_with_speaker_diarization(audio_path, hf_token)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
102
 
103
+ # Step 2: Translate text if needed
104
+ full_text = " ".join([seg["text"] for seg in aligned_transcription])
105
+ if language != "en":
106
+ translated_text = translate_text(full_text, src_lang="ar", tgt_lang="en")
107
  else:
108
+ translated_text = full_text
 
 
 
 
 
 
 
 
109
 
110
+ # Step 3: Perform topic modeling
111
+ topics, _ = perform_topic_modeling([translated_text])
112
 
113
+ # Step 4: Summarize text
114
+ summary = summarize_text(translated_text)
115
 
116
+ return "Audio processed successfully!"
117
+
118
+ def answer_question(query):
119
+ global aligned_transcription, translated_text, topics, summary
 
120
 
121
+ # Combine context for the LLM
122
+ context = f"""
123
+ Transcription: {translated_text}
124
+ Topics: {topics.to_string(index=False)}
125
+ Summary: {summary}
126
+ """
 
 
 
 
127
 
128
+ # Generate response using LLM
129
+ response = generate_response(f"{context}\nQuestion: {query}")
130
+ return response
131
 
132
  # Gradio Interface
133
  with gr.Blocks() as demo:
134
+ gr.Markdown("# Advanced Audio Analysis App with Speaker Diarization")
135
  audio_input = gr.Audio(label="Upload Audio File")
136
+ language_input = gr.Dropdown(choices=["auto", "en", "ar"], label="Language", value="auto")
137
+ hf_token_input = gr.Textbox(label="Hugging Face Token (for pyannote.audio)", type="password")
138
  process_button = gr.Button("Process Audio")
139
  status_output = gr.Textbox(label="Status")
140
 
141
  question_input = gr.Textbox(label="Ask a Question")
142
  answer_output = gr.Textbox(label="Answer")
143
 
144
+ process_button.click(
145
+ process_audio,
146
+ inputs=[audio_input, language_input, hf_token_input],
147
+ outputs=status_output
148
+ )
149
+ question_input.submit(answer_question, inputs=question_input, outputs=answer_output)
150
 
151
  demo.launch()