Prathamesh Sarjerao Vaidya
completed the project
3e27995
"""
Speaker Verification Module for PS-6 Requirements
This module extends beyond speaker diarization to include speaker identification
and verification capabilities using speaker embeddings and similarity matching.
"""
import numpy as np
import torch
import torchaudio
from typing import Dict, List, Tuple, Optional
import logging
from pathlib import Path
import json
import pickle
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings("ignore")
logger = logging.getLogger(__name__)
class SpeakerVerifier:
"""
Speaker verification system using speaker embeddings for identification
and verification tasks beyond basic diarization.
"""
def __init__(self, device: str = "cpu", cache_dir: str = "./model_cache"):
self.device = device
self.cache_dir = Path(cache_dir)
self.speaker_database = {}
self.embedding_model = None
self.similarity_threshold = 0.7 # Cosine similarity threshold for verification
# Initialize the speaker verification model
self._initialize_model()
def _initialize_model(self):
"""Initialize the speaker embedding model."""
try:
# Try multiple advanced speaker embedding models for enhanced performance
models_to_try = [
"speechbrain/spkrec-ecapa-voxceleb",
"speechbrain/spkrec-xvect-voxceleb",
"microsoft/DialoGPT-medium", # For conversational context
"facebook/wav2vec2-base-960h" # For robust feature extraction
]
for model_name in models_to_try:
try:
if "speechbrain" in model_name:
from speechbrain.pretrained import EncoderClassifier
self.embedding_model = EncoderClassifier.from_hparams(
source=model_name,
savedir=f"{self.cache_dir}/speechbrain_models/{model_name.split('/')[-1]}",
run_opts={"device": self.device}
)
self.model_type = "speechbrain"
logger.info(f"Loaded SpeechBrain model: {model_name}")
break
elif "wav2vec2" in model_name:
from transformers import Wav2Vec2Model, Wav2Vec2Processor
self.embedding_model = Wav2Vec2Model.from_pretrained(model_name)
self.processor = Wav2Vec2Processor.from_pretrained(model_name)
self.model_type = "wav2vec2"
logger.info(f"Loaded Wav2Vec2 model: {model_name}")
break
except Exception as model_error:
logger.warning(f"Failed to load {model_name}: {model_error}")
continue
if self.embedding_model is None:
# Fallback to pyannote
try:
from pyannote.audio import Model
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding
self.embedding_model = PretrainedSpeakerEmbedding(
"speechbrain/spkrec-ecapa-voxceleb",
device=torch.device(self.device)
)
self.model_type = "pyannote"
logger.info("Loaded pyannote speaker embedding model")
except Exception as e:
logger.warning(f"Could not load any speaker embedding model: {e}")
logger.info("Falling back to basic speaker verification using diarization embeddings")
self.embedding_model = None
self.model_type = "basic"
except Exception as e:
logger.error(f"Error initializing speaker verification models: {e}")
self.embedding_model = None
self.model_type = "basic"
def extract_speaker_embedding(self, audio_path: str, start_time: float, end_time: float) -> np.ndarray:
"""
Extract speaker embedding from audio segment using advanced models.
Args:
audio_path: Path to audio file
start_time: Start time in seconds
end_time: End time in seconds
Returns:
Speaker embedding vector
"""
try:
if self.embedding_model is not None and self.model_type != "basic":
# Load and segment audio
import librosa
y, sr = librosa.load(audio_path, sr=16000, offset=start_time, duration=end_time-start_time)
if self.model_type == "speechbrain":
# Use SpeechBrain models for enhanced performance
waveform = torch.from_numpy(y).unsqueeze(0)
embedding = self.embedding_model.encode_batch(waveform)
return embedding.squeeze().cpu().numpy()
elif self.model_type == "wav2vec2":
# Use Wav2Vec2 for robust feature extraction
inputs = self.processor(y, sampling_rate=16000, return_tensors="pt", padding=True)
with torch.no_grad():
outputs = self.embedding_model(**inputs)
# Use mean pooling of last hidden states
embedding = outputs.last_hidden_state.mean(dim=1).squeeze()
return embedding.cpu().numpy()
elif self.model_type == "pyannote":
# Use pyannote's speaker embedding model
from pyannote.audio import Audio
audio = Audio(sample_rate=16000, mono=True)
waveform, sample_rate = audio.crop(audio_path, start_time, end_time)
embedding = self.embedding_model({"waveform": waveform, "sample_rate": sample_rate})
return embedding.cpu().numpy().flatten()
else:
# Fallback: Use enhanced basic features
return self._extract_enhanced_features(audio_path, start_time, end_time)
except Exception as e:
logger.error(f"Error extracting speaker embedding: {e}")
return np.zeros(512) # Return zero vector as fallback
def _extract_enhanced_features(self, audio_path: str, start_time: float, end_time: float) -> np.ndarray:
"""Extract enhanced audio features for advanced speaker verification."""
try:
import librosa
# Load audio segment
y, sr = librosa.load(audio_path, sr=16000, offset=start_time, duration=end_time-start_time)
# Enhanced feature extraction for advanced performance
features = []
# 1. MFCC features (13 coefficients + deltas + delta-deltas)
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
mfcc_deltas = librosa.feature.delta(mfccs)
mfcc_delta2 = librosa.feature.delta(mfccs, order=2)
features.extend([
np.mean(mfccs, axis=1),
np.mean(mfcc_deltas, axis=1),
np.mean(mfcc_delta2, axis=1)
])
# 2. Spectral features
spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)
spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)
spectral_bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)
zero_crossing_rate = librosa.feature.zero_crossing_rate(y)
features.extend([
np.mean(spectral_centroids),
np.mean(spectral_rolloff),
np.mean(spectral_bandwidth),
np.mean(zero_crossing_rate)
])
# 3. Chroma features
chroma = librosa.feature.chroma_stft(y=y, sr=sr)
features.append(np.mean(chroma, axis=1))
# 4. Tonnetz features
tonnetz = librosa.feature.tonnetz(y=y, sr=sr)
features.append(np.mean(tonnetz, axis=1))
# 5. Spectral contrast
contrast = librosa.feature.spectral_contrast(y=y, sr=sr)
features.append(np.mean(contrast, axis=1))
# 6. Rhythm features
tempo, beats = librosa.beat.beat_track(y=y, sr=sr)
features.append([tempo])
# 7. Pitch features
pitches, magnitudes = librosa.piptrack(y=y, sr=sr)
features.append([np.mean(pitches), np.std(pitches)])
# Combine all features
combined_features = np.concatenate(features)
# Normalize features
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
normalized_features = scaler.fit_transform(combined_features.reshape(-1, 1)).flatten()
# Pad or truncate to fixed size
if len(normalized_features) < 512:
normalized_features = np.pad(normalized_features, (0, 512 - len(normalized_features)))
else:
normalized_features = normalized_features[:512]
return normalized_features
except Exception as e:
logger.error(f"Error extracting enhanced features: {e}")
return self._extract_basic_features(audio_path, start_time, end_time)
def _extract_basic_features(self, audio_path: str, start_time: float, end_time: float) -> np.ndarray:
"""Extract basic audio features as fallback embedding."""
try:
import librosa
# Load audio segment
y, sr = librosa.load(audio_path, sr=16000, offset=start_time, duration=end_time-start_time)
# Extract MFCC features
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
# Extract spectral features
spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)
spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)
zero_crossing_rate = librosa.feature.zero_crossing_rate(y)
# Combine features
features = np.concatenate([
np.mean(mfccs, axis=1),
np.mean(spectral_centroids),
np.mean(spectral_rolloff),
np.mean(zero_crossing_rate)
])
# Pad or truncate to fixed size
if len(features) < 512:
features = np.pad(features, (0, 512 - len(features)))
else:
features = features[:512]
return features
except Exception as e:
logger.error(f"Error extracting basic features: {e}")
return np.zeros(512)
def enroll_speaker(self, speaker_id: str, audio_path: str, segments: List[Tuple[float, float]]) -> bool:
"""
Enroll a speaker in the verification database.
Args:
speaker_id: Unique identifier for the speaker
audio_path: Path to audio file
segments: List of (start_time, end_time) tuples for speaker segments
Returns:
True if enrollment successful, False otherwise
"""
try:
embeddings = []
for start_time, end_time in segments:
embedding = self.extract_speaker_embedding(audio_path, start_time, end_time)
embeddings.append(embedding)
if embeddings:
# Store multiple embeddings for robust verification
self.speaker_database[speaker_id] = {
'embeddings': embeddings,
'mean_embedding': np.mean(embeddings, axis=0),
'audio_path': audio_path,
'enrollment_time': len(embeddings)
}
# Save to disk
self._save_speaker_database()
logger.info(f"Speaker {speaker_id} enrolled successfully with {len(embeddings)} segments")
return True
return False
except Exception as e:
logger.error(f"Error enrolling speaker {speaker_id}: {e}")
return False
def verify_speaker(self, speaker_id: str, audio_path: str, start_time: float, end_time: float) -> Dict:
"""
Verify if an audio segment belongs to a known speaker using advanced methods.
Args:
speaker_id: Speaker to verify against
audio_path: Path to audio file
start_time: Start time of segment
end_time: End time of segment
Returns:
Dictionary with verification results
"""
try:
if speaker_id not in self.speaker_database:
return {
'verified': False,
'confidence': 0.0,
'error': f"Speaker {speaker_id} not found in database"
}
# Extract embedding from test segment
test_embedding = self.extract_speaker_embedding(audio_path, start_time, end_time)
# Get speaker's stored embeddings
speaker_data = self.speaker_database[speaker_id]
stored_embeddings = speaker_data['embeddings']
mean_embedding = speaker_data['mean_embedding']
# Advanced verification using multiple similarity metrics
similarities = []
euclidean_distances = []
for stored_embedding in stored_embeddings:
# Cosine similarity
cos_sim = cosine_similarity([test_embedding], [stored_embedding])[0][0]
similarities.append(cos_sim)
# Euclidean distance (normalized)
euclidean_dist = np.linalg.norm(test_embedding - stored_embedding)
euclidean_distances.append(euclidean_dist)
# Calculate multiple similarity metrics
max_similarity = max(similarities)
mean_similarity = np.mean(similarities)
min_euclidean = min(euclidean_distances)
mean_euclidean = np.mean(euclidean_distances)
# Advanced confidence scoring using multiple metrics
# Normalize euclidean distance to similarity (0-1 range)
euclidean_similarity = 1 / (1 + mean_euclidean)
# Weighted combination of multiple metrics
confidence = (
0.4 * max_similarity + # Best cosine similarity
0.3 * mean_similarity + # Average cosine similarity
0.2 * euclidean_similarity + # Euclidean-based similarity
0.1 * (1 - min_euclidean / (1 + min_euclidean)) # Min distance similarity
)
# Dynamic threshold based on enrollment quality
dynamic_threshold = self.similarity_threshold
if len(stored_embeddings) >= 5:
dynamic_threshold *= 0.95 # Lower threshold for well-enrolled speakers
elif len(stored_embeddings) < 3:
dynamic_threshold *= 1.05 # Higher threshold for poorly enrolled speakers
# Verification decision
verified = confidence >= dynamic_threshold
# Additional confidence factors
enrollment_quality = min(len(stored_embeddings) / 10.0, 1.0) # 0-1 scale
final_confidence = confidence * (0.8 + 0.2 * enrollment_quality)
return {
'verified': verified,
'confidence': float(final_confidence),
'raw_confidence': float(confidence),
'max_similarity': float(max_similarity),
'mean_similarity': float(mean_similarity),
'euclidean_similarity': float(euclidean_similarity),
'threshold': float(dynamic_threshold),
'enrollment_segments': len(stored_embeddings),
'enrollment_quality': float(enrollment_quality),
'verification_method': self.model_type
}
except Exception as e:
logger.error(f"Error verifying speaker {speaker_id}: {e}")
return {
'verified': False,
'confidence': 0.0,
'error': str(e)
}
def identify_speaker(self, audio_path: str, start_time: float, end_time: float) -> Dict:
"""
Identify the most likely speaker from the enrolled database.
Args:
audio_path: Path to audio file
start_time: Start time of segment
end_time: End time of segment
Returns:
Dictionary with identification results
"""
try:
if not self.speaker_database:
return {
'identified_speaker': None,
'confidence': 0.0,
'error': "No speakers enrolled in database"
}
# Extract embedding from test segment
test_embedding = self.extract_speaker_embedding(audio_path, start_time, end_time)
best_speaker = None
best_confidence = 0.0
all_scores = {}
# Compare against all enrolled speakers
for speaker_id, speaker_data in self.speaker_database.items():
stored_embeddings = speaker_data['embeddings']
similarities = []
for stored_embedding in stored_embeddings:
similarity = cosine_similarity([test_embedding], [stored_embedding])[0][0]
similarities.append(similarity)
confidence = np.mean(similarities)
all_scores[speaker_id] = confidence
if confidence > best_confidence:
best_confidence = confidence
best_speaker = speaker_id
return {
'identified_speaker': best_speaker,
'confidence': float(best_confidence),
'all_scores': all_scores,
'threshold': self.similarity_threshold
}
except Exception as e:
logger.error(f"Error identifying speaker: {e}")
return {
'identified_speaker': None,
'confidence': 0.0,
'error': str(e)
}
def _save_speaker_database(self):
"""Save speaker database to disk."""
try:
db_path = self.cache_dir / "speaker_database.pkl"
self.cache_dir.mkdir(exist_ok=True)
with open(db_path, 'wb') as f:
pickle.dump(self.speaker_database, f)
except Exception as e:
logger.error(f"Error saving speaker database: {e}")
def _load_speaker_database(self):
"""Load speaker database from disk."""
try:
db_path = self.cache_dir / "speaker_database.pkl"
if db_path.exists():
with open(db_path, 'rb') as f:
self.speaker_database = pickle.load(f)
logger.info(f"Loaded speaker database with {len(self.speaker_database)} speakers")
except Exception as e:
logger.error(f"Error loading speaker database: {e}")
self.speaker_database = {}
def get_speaker_statistics(self) -> Dict:
"""Get statistics about enrolled speakers."""
if not self.speaker_database:
return {'total_speakers': 0, 'speakers': []}
speakers_info = []
for speaker_id, data in self.speaker_database.items():
speakers_info.append({
'speaker_id': speaker_id,
'enrollment_segments': data['enrollment_time'],
'audio_path': data['audio_path']
})
return {
'total_speakers': len(self.speaker_database),
'speakers': speakers_info
}
def clear_database(self):
"""Clear all enrolled speakers."""
self.speaker_database = {}
self._save_speaker_database()
logger.info("Speaker database cleared")