Spaces:
Running
on
Zero
Running
on
Zero
from __future__ import annotations | |
import av | |
import torchaudio | |
import torch | |
import comfy.model_management | |
import folder_paths | |
import os | |
import io | |
import json | |
import random | |
import hashlib | |
import node_helpers | |
from comfy.cli_args import args | |
from comfy.comfy_types import FileLocator | |
class EmptyLatentAudio: | |
def __init__(self): | |
self.device = comfy.model_management.intermediate_device() | |
def INPUT_TYPES(s): | |
return {"required": {"seconds": ("FLOAT", {"default": 47.6, "min": 1.0, "max": 1000.0, "step": 0.1}), | |
"batch_size": ("INT", {"default": 1, "min": 1, "max": 4096, "tooltip": "The number of latent images in the batch."}), | |
}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "generate" | |
CATEGORY = "latent/audio" | |
def generate(self, seconds, batch_size): | |
length = round((seconds * 44100 / 2048) / 2) * 2 | |
latent = torch.zeros([batch_size, 64, length], device=self.device) | |
return ({"samples":latent, "type": "audio"}, ) | |
class ConditioningStableAudio: | |
def INPUT_TYPES(s): | |
return {"required": {"positive": ("CONDITIONING", ), | |
"negative": ("CONDITIONING", ), | |
"seconds_start": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1000.0, "step": 0.1}), | |
"seconds_total": ("FLOAT", {"default": 47.0, "min": 0.0, "max": 1000.0, "step": 0.1}), | |
}} | |
RETURN_TYPES = ("CONDITIONING","CONDITIONING") | |
RETURN_NAMES = ("positive", "negative") | |
FUNCTION = "append" | |
CATEGORY = "conditioning" | |
def append(self, positive, negative, seconds_start, seconds_total): | |
positive = node_helpers.conditioning_set_values(positive, {"seconds_start": seconds_start, "seconds_total": seconds_total}) | |
negative = node_helpers.conditioning_set_values(negative, {"seconds_start": seconds_start, "seconds_total": seconds_total}) | |
return (positive, negative) | |
class VAEEncodeAudio: | |
def INPUT_TYPES(s): | |
return {"required": { "audio": ("AUDIO", ), "vae": ("VAE", )}} | |
RETURN_TYPES = ("LATENT",) | |
FUNCTION = "encode" | |
CATEGORY = "latent/audio" | |
def encode(self, vae, audio): | |
sample_rate = audio["sample_rate"] | |
if 44100 != sample_rate: | |
waveform = torchaudio.functional.resample(audio["waveform"], sample_rate, 44100) | |
else: | |
waveform = audio["waveform"] | |
t = vae.encode(waveform.movedim(1, -1)) | |
return ({"samples":t}, ) | |
class VAEDecodeAudio: | |
def INPUT_TYPES(s): | |
return {"required": { "samples": ("LATENT", ), "vae": ("VAE", )}} | |
RETURN_TYPES = ("AUDIO",) | |
FUNCTION = "decode" | |
CATEGORY = "latent/audio" | |
def decode(self, vae, samples): | |
audio = vae.decode(samples["samples"]).movedim(-1, 1) | |
std = torch.std(audio, dim=[1,2], keepdim=True) * 5.0 | |
std[std < 1.0] = 1.0 | |
audio /= std | |
return ({"waveform": audio, "sample_rate": 44100}, ) | |
def save_audio(self, audio, filename_prefix="ComfyUI", format="flac", prompt=None, extra_pnginfo=None, quality="128k"): | |
filename_prefix += self.prefix_append | |
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir) | |
results: list[FileLocator] = [] | |
# Prepare metadata dictionary | |
metadata = {} | |
if not args.disable_metadata: | |
if prompt is not None: | |
metadata["prompt"] = json.dumps(prompt) | |
if extra_pnginfo is not None: | |
for x in extra_pnginfo: | |
metadata[x] = json.dumps(extra_pnginfo[x]) | |
# Opus supported sample rates | |
OPUS_RATES = [8000, 12000, 16000, 24000, 48000] | |
for (batch_number, waveform) in enumerate(audio["waveform"].cpu()): | |
filename_with_batch_num = filename.replace("%batch_num%", str(batch_number)) | |
file = f"{filename_with_batch_num}_{counter:05}_.{format}" | |
output_path = os.path.join(full_output_folder, file) | |
# Use original sample rate initially | |
sample_rate = audio["sample_rate"] | |
# Handle Opus sample rate requirements | |
if format == "opus": | |
if sample_rate > 48000: | |
sample_rate = 48000 | |
elif sample_rate not in OPUS_RATES: | |
# Find the next highest supported rate | |
for rate in sorted(OPUS_RATES): | |
if rate > sample_rate: | |
sample_rate = rate | |
break | |
if sample_rate not in OPUS_RATES: # Fallback if still not supported | |
sample_rate = 48000 | |
# Resample if necessary | |
if sample_rate != audio["sample_rate"]: | |
waveform = torchaudio.functional.resample(waveform, audio["sample_rate"], sample_rate) | |
# Create in-memory WAV buffer | |
wav_buffer = io.BytesIO() | |
torchaudio.save(wav_buffer, waveform, sample_rate, format="WAV") | |
wav_buffer.seek(0) # Rewind for reading | |
# Use PyAV to convert and add metadata | |
input_container = av.open(wav_buffer) | |
# Create output with specified format | |
output_buffer = io.BytesIO() | |
output_container = av.open(output_buffer, mode='w', format=format) | |
# Set metadata on the container | |
for key, value in metadata.items(): | |
output_container.metadata[key] = value | |
# Set up the output stream with appropriate properties | |
input_container.streams.audio[0] | |
if format == "opus": | |
out_stream = output_container.add_stream("libopus", rate=sample_rate) | |
if quality == "64k": | |
out_stream.bit_rate = 64000 | |
elif quality == "96k": | |
out_stream.bit_rate = 96000 | |
elif quality == "128k": | |
out_stream.bit_rate = 128000 | |
elif quality == "192k": | |
out_stream.bit_rate = 192000 | |
elif quality == "320k": | |
out_stream.bit_rate = 320000 | |
elif format == "mp3": | |
out_stream = output_container.add_stream("libmp3lame", rate=sample_rate) | |
if quality == "V0": | |
#TODO i would really love to support V3 and V5 but there doesn't seem to be a way to set the qscale level, the property below is a bool | |
out_stream.codec_context.qscale = 1 | |
elif quality == "128k": | |
out_stream.bit_rate = 128000 | |
elif quality == "320k": | |
out_stream.bit_rate = 320000 | |
else: #format == "flac": | |
out_stream = output_container.add_stream("flac", rate=sample_rate) | |
# Copy frames from input to output | |
for frame in input_container.decode(audio=0): | |
frame.pts = None # Let PyAV handle timestamps | |
output_container.mux(out_stream.encode(frame)) | |
# Flush encoder | |
output_container.mux(out_stream.encode(None)) | |
# Close containers | |
output_container.close() | |
input_container.close() | |
# Write the output to file | |
output_buffer.seek(0) | |
with open(output_path, 'wb') as f: | |
f.write(output_buffer.getbuffer()) | |
results.append({ | |
"filename": file, | |
"subfolder": subfolder, | |
"type": self.type | |
}) | |
counter += 1 | |
return { "ui": { "audio": results } } | |
class SaveAudio: | |
def __init__(self): | |
self.output_dir = folder_paths.get_output_directory() | |
self.type = "output" | |
self.prefix_append = "" | |
def INPUT_TYPES(s): | |
return {"required": { "audio": ("AUDIO", ), | |
"filename_prefix": ("STRING", {"default": "audio/ComfyUI"}), | |
}, | |
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, | |
} | |
RETURN_TYPES = () | |
FUNCTION = "save_flac" | |
OUTPUT_NODE = True | |
CATEGORY = "audio" | |
def save_flac(self, audio, filename_prefix="ComfyUI", format="flac", prompt=None, extra_pnginfo=None): | |
return save_audio(self, audio, filename_prefix, format, prompt, extra_pnginfo) | |
class SaveAudioMP3: | |
def __init__(self): | |
self.output_dir = folder_paths.get_output_directory() | |
self.type = "output" | |
self.prefix_append = "" | |
def INPUT_TYPES(s): | |
return {"required": { "audio": ("AUDIO", ), | |
"filename_prefix": ("STRING", {"default": "audio/ComfyUI"}), | |
"quality": (["V0", "128k", "320k"], {"default": "V0"}), | |
}, | |
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, | |
} | |
RETURN_TYPES = () | |
FUNCTION = "save_mp3" | |
OUTPUT_NODE = True | |
CATEGORY = "audio" | |
def save_mp3(self, audio, filename_prefix="ComfyUI", format="mp3", prompt=None, extra_pnginfo=None, quality="128k"): | |
return save_audio(self, audio, filename_prefix, format, prompt, extra_pnginfo, quality) | |
class SaveAudioOpus: | |
def __init__(self): | |
self.output_dir = folder_paths.get_output_directory() | |
self.type = "output" | |
self.prefix_append = "" | |
def INPUT_TYPES(s): | |
return {"required": { "audio": ("AUDIO", ), | |
"filename_prefix": ("STRING", {"default": "audio/ComfyUI"}), | |
"quality": (["64k", "96k", "128k", "192k", "320k"], {"default": "128k"}), | |
}, | |
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, | |
} | |
RETURN_TYPES = () | |
FUNCTION = "save_opus" | |
OUTPUT_NODE = True | |
CATEGORY = "audio" | |
def save_opus(self, audio, filename_prefix="ComfyUI", format="opus", prompt=None, extra_pnginfo=None, quality="V3"): | |
return save_audio(self, audio, filename_prefix, format, prompt, extra_pnginfo, quality) | |
class PreviewAudio(SaveAudio): | |
def __init__(self): | |
self.output_dir = folder_paths.get_temp_directory() | |
self.type = "temp" | |
self.prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5)) | |
def INPUT_TYPES(s): | |
return {"required": | |
{"audio": ("AUDIO", ), }, | |
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, | |
} | |
class LoadAudio: | |
def INPUT_TYPES(s): | |
input_dir = folder_paths.get_input_directory() | |
files = folder_paths.filter_files_content_types(os.listdir(input_dir), ["audio", "video"]) | |
return {"required": {"audio": (sorted(files), {"audio_upload": True})}} | |
CATEGORY = "audio" | |
RETURN_TYPES = ("AUDIO", ) | |
FUNCTION = "load" | |
def load(self, audio): | |
audio_path = folder_paths.get_annotated_filepath(audio) | |
waveform, sample_rate = torchaudio.load(audio_path) | |
audio = {"waveform": waveform.unsqueeze(0), "sample_rate": sample_rate} | |
return (audio, ) | |
def IS_CHANGED(s, audio): | |
image_path = folder_paths.get_annotated_filepath(audio) | |
m = hashlib.sha256() | |
with open(image_path, 'rb') as f: | |
m.update(f.read()) | |
return m.digest().hex() | |
def VALIDATE_INPUTS(s, audio): | |
if not folder_paths.exists_annotated_filepath(audio): | |
return "Invalid audio file: {}".format(audio) | |
return True | |
NODE_CLASS_MAPPINGS = { | |
"EmptyLatentAudio": EmptyLatentAudio, | |
"VAEEncodeAudio": VAEEncodeAudio, | |
"VAEDecodeAudio": VAEDecodeAudio, | |
"SaveAudio": SaveAudio, | |
"SaveAudioMP3": SaveAudioMP3, | |
"SaveAudioOpus": SaveAudioOpus, | |
"LoadAudio": LoadAudio, | |
"PreviewAudio": PreviewAudio, | |
"ConditioningStableAudio": ConditioningStableAudio, | |
} | |
NODE_DISPLAY_NAME_MAPPINGS = { | |
"EmptyLatentAudio": "Empty Latent Audio", | |
"VAEEncodeAudio": "VAE Encode Audio", | |
"VAEDecodeAudio": "VAE Decode Audio", | |
"PreviewAudio": "Preview Audio", | |
"LoadAudio": "Load Audio", | |
"SaveAudio": "Save Audio (FLAC)", | |
"SaveAudioMP3": "Save Audio (MP3)", | |
"SaveAudioOpus": "Save Audio (Opus)", | |
} | |