|
import os |
|
import tempfile |
|
import torch |
|
from contextlib import asynccontextmanager |
|
from fastapi import FastAPI, UploadFile, File, HTTPException |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from transformers import WhisperProcessor, WhisperForConditionalGeneration, pipeline |
|
from peft import PeftModel |
|
import librosa |
|
from pydub import AudioSegment |
|
from dotenv import load_dotenv |
|
|
|
transcriber = None |
|
|
|
@asynccontextmanager |
|
async def lifespan(app: FastAPI): |
|
global transcriber |
|
|
|
cache_dir = "/tmp/hf_cache" |
|
os.makedirs(cache_dir, exist_ok=True) |
|
os.environ["HF_HOME"] = cache_dir |
|
os.environ["TRANSFORMERS_CACHE"] = cache_dir |
|
os.environ["HF_HUB_CACHE"] = cache_dir |
|
|
|
|
|
numba_cache_dir = os.path.join(cache_dir, "numba_cache") |
|
os.makedirs(numba_cache_dir, exist_ok=True) |
|
os.environ["NUMBA_CACHE_DIR"] = numba_cache_dir |
|
|
|
device = "cuda:0" if torch.cuda.is_available() else "cpu" |
|
torch_dtype = torch.float16 if device == "cuda:0" else torch.float32 |
|
load_dotenv(override=True) |
|
print("After load_dotenv, HF_TOKEN:", os.getenv("HF_TOKEN")) |
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
|
|
|
|
if hf_token is None: |
|
raise ValueError("Hugging Face token not found. Please set the HUGGING_FACE_TOKEN environment variable.") |
|
|
|
BASE_MODEL_PATH = "openai/whisper-large-v3-turbo" |
|
|
|
ADAPTER_AND_PROCESSOR_PATH = "Tiberiw/whisper-large-turbo-lora-finetuned-v3" |
|
|
|
processor = WhisperProcessor.from_pretrained( |
|
ADAPTER_AND_PROCESSOR_PATH, |
|
token=hf_token, |
|
cache_dir=cache_dir |
|
) |
|
base_model = WhisperForConditionalGeneration.from_pretrained( |
|
BASE_MODEL_PATH, |
|
torch_dtype=torch_dtype, |
|
cache_dir=cache_dir |
|
) |
|
final_model = PeftModel.from_pretrained( |
|
base_model, |
|
ADAPTER_AND_PROCESSOR_PATH, |
|
token=hf_token, |
|
cache_dir=cache_dir |
|
) |
|
transcriber = pipeline( |
|
"automatic-speech-recognition", |
|
model=final_model, |
|
torch_dtype=torch_dtype, |
|
device=device, |
|
tokenizer=processor.tokenizer, |
|
feature_extractor=processor.feature_extractor, |
|
) |
|
print("Model loaded successfully!") |
|
yield |
|
|
|
app = FastAPI(lifespan=lifespan) |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
def load_audio(path: str): |
|
try: |
|
audio_array, _ = librosa.load(path, sr=16000, mono=True) |
|
return audio_array |
|
except Exception as e: |
|
import traceback |
|
msg = f"Error processing audio failed to load audio: {str(e)}\n{traceback.format_exc()}" |
|
if any(err in str(e) for err in ["NoBackendError", "SoundFileNotOpen", "Unsupported format", "AudioreadError"]): |
|
raise HTTPException(status_code=415, detail=msg + "\nSupported formats: ( WEBM, WAV, MP3, FLAC)") |
|
raise HTTPException(status_code=500, detail=msg) |
|
|
|
|
|
|
|
|
|
@app.post("/api/transcription") |
|
async def transcribe_pipeline(file: UploadFile = File(...)): |
|
if not file.content_type or not file.content_type.startswith("audio/"): |
|
raise HTTPException(status_code=400, detail="Invalid file content type.") |
|
|
|
print(f"Received file: {file.filename}, Content-Type: {file.content_type}") |
|
|
|
original_temp_path = None |
|
input_for_librosa_path = None |
|
|
|
|
|
try: |
|
|
|
|
|
file_suffix = ".unknown" |
|
if file.filename: |
|
_, ext = os.path.splitext(file.filename) |
|
if ext: |
|
file_suffix = ext |
|
print(f"Saving uploaded file to temporary location with suffix '{file_suffix}'") |
|
with tempfile.NamedTemporaryFile(delete=False, suffix=file_suffix) as temp_orig_file: |
|
content = await file.read() |
|
temp_orig_file.write(content) |
|
original_temp_path = temp_orig_file.name |
|
|
|
|
|
await file.close() |
|
|
|
if file.content_type.startswith("audio/webm"): |
|
print(f"Conversion needed for '{original_temp_path}' (ContentType: {file.content_type}) to MP3.") |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_conv_file: |
|
input_for_librosa_path = temp_conv_file.name |
|
|
|
try: |
|
|
|
|
|
audio = AudioSegment.from_file(original_temp_path) |
|
|
|
|
|
audio.export(input_for_librosa_path, format="mp3") |
|
print(f"Successfully converted '{original_temp_path}' to MP3: '{input_for_librosa_path}'") |
|
except Exception as e: |
|
import traceback |
|
err_msg = f"Audio conversion failed: {str(e)}\n{traceback.format_exc()}" |
|
if "ffmpeg" in str(e).lower(): |
|
err_msg += "\nEnsure FFmpeg is installed and in PATH." |
|
raise HTTPException(status_code=500, detail=err_msg) |
|
else: |
|
input_for_librosa_path = original_temp_path |
|
original_temp_path = None |
|
|
|
|
|
audio_array = load_audio(input_for_librosa_path) |
|
result = transcriber(audio_array.copy(), return_timestamps=True) |
|
return {"transcription": result["text"]} |
|
|
|
except HTTPException: |
|
raise |
|
except Exception as e: |
|
import traceback |
|
raise HTTPException(status_code=500, detail=f"Unexpected error : {str(e)}\n{traceback.format_exc()}") |
|
finally: |
|
for f in (original_temp_path, input_for_librosa_path): |
|
if f and os.path.exists(f): |
|
os.unlink(f) |