test1 / app.py
Zw07's picture
Update app.py
f807785 verified
import time
import streamlit as st
# from transformers import pipeline
import os
import torch
import datetime
import numpy as np
import soundfile
from wavmark.utils import file_reader
from audioseal import AudioSeal
import torchaudio
from pydub import AudioSegment
import io
import librosa
import ffmpeg
def create_default_value():
if "def_value" not in st.session_state:
def_val_npy = np.random.choice([0, 1], size=32 - len_start_bit)
def_val_str = "".join([str(i) for i in def_val_npy])
st.session_state.def_value = def_val_str
def download_sample_audio():
url = "https://keithito.com/LJ-Speech-Dataset/LJ037-0171.wav"
with open("test.wav", "wb") as f:
resp = urllib.request.urlopen(url)
f.write(resp.read())
wav, sample_rate = torchaudio.load("test.wav")
return wav, sample_rate
# Main web app
def main():
create_default_value()
# st.title("MDS07 Demo Presentation")
# st.write("https://github.com/ravindi-r/audioseal")
markdown_text = """
# MDS07 Demo Presentation
[AudioSeal](https://github.com/ravindi-r/audioseal) is the next-generation watermarking tool driven by AI.
You can upload an audio file and encode a custom 16-bit watermark or perform decoding from a watermarked audio.
This page is for demonstration usage.
If you have longer files for processing, we recommend using [our python toolkit](https://github.com/ravindi-r/audioseal).
"""
# 使用st.markdown渲染Markdown文本
st.markdown(markdown_text)
audio_file = st.file_uploader("Upload Audio", type=["wav", "mp3"], accept_multiple_files=False)
try:
if audio_file:
#2nd attempt
# Save file to local storage
tmp_input_audio_file = os.path.join("/tmp/", audio_file.name)
file_extension = os.path.splitext(tmp_input_audio_file)[1].lower()
#st.markdown(file_extension)
if file_extension in [".wav", ".flac"]:
with open("test.wav", "wb") as f:
f.write(audio_file.getbuffer())
st.audio("test.wav", format="audio/wav")
elif file_extension == ".mp3":
with open("test.mp3", "wb") as f:
f.write(audio_file.getbuffer())
st.audio("test.mp3", format="audio/mpeg")
#Load the WAV file using torchaudio
if file_extension in [".wav", ".flac"]:
wav, sample_rate = torchaudio.load("test.wav")
# st.markdown("Before unsquueze wav")
# st.markdown(wav)
file_extension_ori =".wav"
#Unsqueeze for line 176
wav= wav.unsqueeze(0)
elif file_extension == ".mp3":
# Load an MP3 file
audio = AudioSegment.from_mp3("test.mp3")
# Export it as a WAV file
audio.export("test.wav", format="wav")
wav3, sample_rate = torchaudio.load("test.wav")
wav= wav3.unsqueeze(0)
file_extension_ori =".mp3"
file_extension =".wav"
action = st.selectbox("Select Action", ["Add Watermark", "Detect Watermark"])
if action == "Add Watermark":
#watermark_text = st.text_input("The watermark (0, 1 list of length-16):", value=st.session_state.def_value)
add_watermark_button = st.button("Add Watermark", key="add_watermark_btn")
if add_watermark_button: # 点击按钮后执行的
#if audio_file and watermark_text:
if audio_file:
with st.spinner("Adding Watermark..."):
#wav = my_read_file(wav,max_second_encode)
#1st attempt
watermark = model.get_watermark(wav, default_sr)
watermarked_audio = wav + watermark
print(watermarked_audio.size())
size = watermarked_audio.size()
#st.markdown(size)
print(watermarked_audio.squeeze())
squeeze = watermarked_audio.squeeze(1)
shape = squeeze.size()
#st.markdown(shape)
#st.markdown(squeeze)
if file_extension_ori in [".wav", ".flac"]:
torchaudio.save("output.wav", squeeze, default_sr, bits_per_sample=16)
watermarked_wav = torchaudio.save("output.wav", squeeze, default_sr, bits_per_sample=16)
st.audio("output.wav", format="audio/wav")
with open("output.wav", "rb") as file:
#file.read()
#file.write(watermarked_wav.getbuffer())
binary_data = file.read()
btn = st.download_button(
label="Download watermarked audio",
data=binary_data,
file_name="output.wav",
mime="audio/wav",
)
elif file_extension_ori == ".mp3":
torchaudio.save("output.wav", squeeze, default_sr)
watermarked_mp3 = torchaudio.save("output.wav", squeeze, default_sr)
audio = AudioSegment.from_wav("output.wav")
# Export as MP3
audio.export("output.mp3", format="mp3")
st.audio("output.mp3", format="audio/mpeg")
with open("output.mp3", "rb") as file:
#file.write(watermarked_wav.getbuffer())
binary_data = file.read()
st.download_button(
label="Download watermarked audio",
data=binary_data,
file_name="output.mp3",
mime="audio/mpeg",
)
elif action == "Detect Watermark":
detect_watermark_button = st.button("Detect Watermark", key="detect_watermark_btn")
if detect_watermark_button:
with st.spinner("Detecting..."):
# result, message = detector.detect_watermark(watermarked_audio, sample_rate=default_sr, message_threshold=0.5)
# st.markdown("Probability of audio being watermarked: ")
# st.markdown(result)
# st.markdown("This is likely a watermarked audio!")
# print(f"\nThis is likely a watermarked audio: {result}")
#Run on an unwatermarked audio
if file_extension in [".wav", ".flac"]:
wav, sample_rate = torchaudio.load("test.wav")
wav= wav.unsqueeze(0)
elif file_extension == ".mp3":
# Load an MP3 file
audio = AudioSegment.from_mp3("test.mp3")
# Export it as a WAV file
audio.export("test.wav", format="wav")
wav, sample_rate = torchaudio.load("test.wav")
wav= wav.unsqueeze(0)
result2, message2 = detector.detect_watermark(wav, sample_rate=default_sr, message_threshold=0.5)
print(f"This is likely an unwatermarked audio: {result2}")
st.markdown("Probability of audio being watermarked: ")
st.markdown(result2)
if result2 < 0.5:
st.markdown("This is likely an unwatermarked audio!")
else:
st.markdown("This is likely an watermarked audio!")
except RuntimeError:
st.error("Please input audio with one channel (mono-channel)")
if __name__ == "__main__":
default_sr = 16000
max_second_encode = 60
max_second_decode = 30
len_start_bit = 16
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# model = wavmark.load_model().to(device)
model = AudioSeal.load_generator("audioseal_wm_16bits")
detector = AudioSeal.load_detector(("audioseal_detector_16bits"))
main()