video_editing_genie_mcp_server / videos_management.py
MalikIbrar's picture
fixes
462d0d3
from pathlib import Path
import uuid
import os
import shutil
import tempfile
import requests
import mimetypes
import subprocess
import json
from typing import Optional, Tuple
APP_BASE_URL = os.getenv("APP_BASE_URL", "http://127.0.0.1:7860/gradio_api")
def get_video_format(video_path: str) -> Tuple[str, str]:
"""
Detect media format (video or audio) and return appropriate extension and mime type.
Uses ffprobe to get accurate format information by checking available streams.
"""
try:
# Get stream information using ffprobe in JSON format
cmd = [
"ffprobe", "-v", "error",
"-show_streams", "-of", "json",
video_path
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
probe_data = json.loads(result.stdout)
has_video = False
has_audio = False
video_codec = ""
audio_codec = ""
for stream in probe_data.get("streams", []):
if stream.get("codec_type") == "video":
has_video = True
video_codec = stream.get("codec_name", "").lower()
elif stream.get("codec_type") == "audio":
has_audio = True
audio_codec = stream.get("codec_name", "").lower()
print(f"[get_video_format] Detected: has_video={has_video}, has_audio={has_audio}, video_codec={video_codec}, audio_codec={audio_codec}")
if has_video:
# Map common video codecs to extensions
codec_to_ext = {
"h264": "mp4", "hevc": "mp4", "vp8": "webm", "vp9": "webm",
"av1": "webm", "mpeg4": "mp4", "mpeg2video": "mpg",
"mjpeg": "mjpg", "prores": "mov"
}
extension = codec_to_ext.get(video_codec, "mp4")
mime_type = mimetypes.guess_type(f"video.{extension}")[0] or "video/mp4"
return extension, mime_type
elif has_audio:
# Map common audio codecs to extensions
codec_to_ext = {
"aac": "aac", "mp3": "mp3", "opus": "ogg", "vorbis": "ogg",
"flac": "flac", "pcm_s16le": "wav"
}
extension = codec_to_ext.get(audio_codec, "mp3")
mime_type = mimetypes.guess_type(f"audio.{extension}")[0] or "audio/mp3"
return extension, mime_type
else:
# If no video or audio stream found (shouldn't happen for valid media)
return "mp4", "video/mp4" # Default fallback for unrecognized
except (subprocess.CalledProcessError, json.JSONDecodeError, Exception) as e:
# Fallback to mp4 if detection fails or file is not valid media
print(f"Error detecting media format: {e}. Falling back to mp4.")
return "mp4", "video/mp4"
def generate_unique_id() -> str:
"""Generate a unique ID for a video file"""
return str(uuid.uuid4())
def save_video_to_static_folder(video_path: str, unique_id: Optional[str] = None) -> Optional[str]:
"""
Save a video to the static videos folder with proper format detection
Args:
video_path: Path to the video file
unique_id: Unique ID for the video. If None, one will be generated
Returns:
URL to access the video or None if failed
"""
try:
if unique_id is None:
unique_id = generate_unique_id()
output_dir = Path.cwd().absolute() / "videos"
output_dir.mkdir(parents=True, exist_ok=True)
# Detect video format
extension, _ = get_video_format(video_path)
output_path = output_dir / f"{unique_id}.{extension}"
print(f"[save_video_to_static_folder] Saving {video_path} as {output_path} with extension {extension}")
# Copy the file to the output path
shutil.copy(video_path, output_path)
# Generate URL
video_url = f"{APP_BASE_URL}/file=videos/{unique_id}.{extension}"
return video_url
except Exception as e:
print(f"[save_video_to_static_folder] Error: {e}")
return None
def download_video_from_url(video_url: str) -> Optional[str]:
"""
Download a video from a URL to a temporary file or handle local file paths.
Supports various video formats and handles format detection.
Args:
video_url: URL of the video or path to local file
Returns:
Path to the downloaded temporary file or local file, or None if failed
"""
if not video_url:
return None
video_url = str(video_url)
# Handle local file paths
if os.path.exists(video_url):
return video_url
# Handle Gradio API URLs
if video_url.startswith(f"{APP_BASE_URL}/file=videos/"):
unique_id = video_url.split('/')[-1].split('.')[0]
video_path = Path.cwd().absolute() / "videos" / f"{unique_id}.mp4"
if video_path.exists():
return str(video_path)
try:
# Download with proper headers to get content type
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(video_url, stream=True, headers=headers, timeout=30)
if response.status_code != 200:
return None
# Get content type
content_type = response.headers.get('content-type', '').lower()
if not content_type.startswith('video/'):
# Try to detect from URL
ext = os.path.splitext(video_url)[1].lower().lstrip('.')
if ext in ['mp4', 'webm', 'mov', 'avi', 'mkv']:
content_type = f'video/{ext}'
else:
content_type = 'video/mp4' # Default to mp4
# Create temp file with appropriate extension
ext = content_type.split('/')[-1]
if ext == 'mpeg':
ext = 'mp4'
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=f".{ext}")
# Write content
for chunk in response.iter_content(chunk_size=8192):
temp_file.write(chunk)
temp_file.close()
# Verify it's a valid video file
try:
cmd = ["ffprobe", "-v", "error", temp_file.name]
subprocess.run(cmd, capture_output=True, check=True)
return temp_file.name
except subprocess.CalledProcessError:
os.unlink(temp_file.name)
return None
except Exception:
return None
def upload_file_to_server(file_path: str) -> Optional[str]:
"""
Upload a file to the server's video directory and return a URL.
Supports various video formats and handles format detection.
Args:
file_path: Path to the local file to upload
Returns:
URL to access the uploaded file or None if failed
"""
try:
if not os.path.exists(file_path):
print(f"[upload_file_to_server] File not found: {file_path}")
return None
# Generate unique ID for the file
unique_id = generate_unique_id()
# Create videos directory if it doesn't exist
output_dir = Path.cwd().absolute() / "videos"
output_dir.mkdir(parents=True, exist_ok=True)
# Detect file format
extension, mime_type = get_video_format(file_path)
output_path = output_dir / f"{unique_id}.{extension}"
print(f"[upload_file_to_server] Uploading {file_path} as {output_path} with extension {extension}")
# Copy the file to the output path
shutil.copy(file_path, output_path)
# Generate and return URL
file_url = f"{APP_BASE_URL}/file=videos/{unique_id}.{extension}"
return file_url
except Exception as e:
print(f"[upload_file_to_server] Error: {e}")
return None