Spaces:
Sleeping
Sleeping
import os | |
import time | |
import requests | |
import streamlit as st | |
from dotenv import find_dotenv, load_dotenv | |
from langchain.chains import LLMChain | |
from langchain.chat_models import ChatOpenAI | |
from langchain.prompts import PromptTemplate | |
from langchain.vectorstores import FAISS | |
from langchain.embeddings import OpenAIEmbeddings | |
from langchain.document_loaders import WikipediaLoader | |
from langchain.llms import OpenAI | |
from transformers import MarianMTModel, MarianTokenizer, pipeline | |
# Progress Bar for Streamlit UI | |
def progress_bar(amount_of_time: int) -> None: | |
progress_text = "Please wait, Generative models hard at work" | |
my_bar = st.progress(0, text=progress_text) | |
for percent_complete in range(amount_of_time): | |
time.sleep(0.04) | |
my_bar.progress(percent_complete + 1, text=progress_text) | |
time.sleep(1) | |
my_bar.empty() | |
# Function to load knowledge base from Wikipedia | |
def load_knowledge_base(): | |
# Load documents based on a query (here we query for "storytelling") | |
loader = WikipediaLoader(query="storytelling") # We no longer use 'max_results' | |
documents = loader.load() | |
# Limit the number of documents if needed (for example, top 3 documents) | |
return documents[:3] | |
# Generate story from scenario with context using Langchain and RAG | |
def generate_story_with_rag(scenario: str, openai_api_key: str) -> str: | |
# Load the knowledge base (Wikipedia articles in this case) | |
documents = load_knowledge_base() | |
# Create embeddings for the documents | |
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key) | |
vectorstore = FAISS.from_documents(documents, embeddings) | |
# Retrieve relevant documents based on the scenario (using RAG) | |
query = scenario | |
relevant_docs = vectorstore.similarity_search(query, k=3) | |
# Combine the context (retrieved documents) and the scenario for story generation | |
context = "\n".join([doc.page_content for doc in relevant_docs]) | |
# Generating the story based on the combined context and scenario | |
prompt_template = f""" | |
You are a talented storyteller who can create a compelling story using the following context. | |
Use the following context along with the scenario to generate a creative story. | |
The story should be maximum of 100 words long: | |
CONTEXT: {context} | |
SCENARIO: {scenario} | |
STORY: | |
""" | |
prompt = PromptTemplate(template=prompt_template, input_variables=["scenario", "context"]) | |
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0.9, openai_api_key=openai_api_key) | |
story_llm = LLMChain(llm=llm, prompt=prompt, verbose=True) | |
generated_story = story_llm.predict(scenario=scenario, context=context) | |
return generated_story | |
# Function to generate text from image (BLIP) | |
def generate_text_from_image(image_path: str) -> str: | |
image_to_text = pipeline("image-to-text", model="Salesforce/blip-image-captioning-base") | |
generated_text = image_to_text(image_path)[0]["generated_text"] | |
print(f"IMAGE INPUT: {image_path}") | |
print(f"GENERATED TEXT OUTPUT: {generated_text}") | |
return generated_text | |
# Translate story using MarianMT (Hugging Face) | |
def translate_with_huggingface(text: str, target_language: str) -> str: | |
model_name = f'Helsinki-NLP/opus-mt-en-{target_language}' | |
model = MarianMTModel.from_pretrained(model_name) | |
tokenizer = MarianTokenizer.from_pretrained(model_name) | |
# Tokenize the input text | |
inputs = tokenizer.encode(text, return_tensors="pt", padding=True, truncation=True) | |
# Perform the translation | |
translated = model.generate(inputs, max_length=512, num_beams=4, early_stopping=True) | |
# Decode the translated text | |
translated_text = tokenizer.decode(translated[0], skip_special_tokens=True) | |
return translated_text | |
# Function to generate speech from text (English) | |
def generate_speech_from_text(message: str, huggingface_token: str) -> None: | |
API_URL = "https://api-inference.huggingface.co/models/espnet/kan-bayashi_ljspeech_vits" | |
headers = {"Authorization": f"Bearer {huggingface_token}"} | |
payloads = {"inputs": message} | |
response = requests.post(API_URL, headers=headers, json=payloads) | |
# Handle model loading | |
if response.status_code == 503: | |
try: | |
info = response.json() | |
wait_time = round(info.get("estimated_time", 10)) | |
except Exception: | |
wait_time = 10 | |
st.warning(f"Model is loading... please wait ~{wait_time} seconds and try again.") | |
return | |
# Handle other errors | |
if response.status_code != 200: | |
try: | |
error_message = response.json().get("error", "Unknown error") | |
except Exception: | |
error_message = "Unknown error" | |
st.error(f"⚠️ Audio generation failed: {response.status_code} - {error_message}") | |
return | |
# Save audio if successful | |
with open("generated_audio.flac", "wb") as file: | |
file.write(response.content) | |
# Main function to tie everything together | |
def main() -> None: | |
st.set_page_config(page_title="Image to Speech", page_icon="🖼️") | |
with st.sidebar: | |
huggingface_token = st.text_input("Enter your Hugging Face Token:", type="password") | |
openai_token = st.text_input("Enter your OpenAI API Key:", type="password") | |
st.header("Image to Speech") | |
# Image uploader | |
uploaded_file = st.file_uploader("Please choose an image to upload", type=["jpg", "jpeg", "png", "webp"]) | |
# Language selection | |
language_options = ['en', 'fr', 'es', 'de', 'it', 'pt'] # Add more languages as needed | |
selected_language = st.selectbox("Select the language for the story", language_options) | |
if uploaded_file is not None and huggingface_token and openai_token: | |
# Save the uploaded image | |
image_path = f"temp_image.{uploaded_file.name.split('.')[-1]}" | |
with open(image_path, "wb") as file: | |
file.write(uploaded_file.getvalue()) | |
# Show the uploaded image | |
st.image(uploaded_file, caption="Uploaded Image", use_column_width=True) | |
# Process the image | |
progress_bar(100) | |
scenario = generate_text_from_image(image_path) | |
story = generate_story_with_rag(scenario, openai_token) | |
# Translate the story if the selected language is not English | |
translated_story = "" | |
if selected_language != 'en': | |
translated_story = translate_with_huggingface(story, selected_language) | |
# Generate speech in English | |
generate_speech_from_text(story, huggingface_token) | |
# Display the generated scenario and stories | |
with st.expander("Generated Image Scenario"): | |
st.write(scenario) | |
with st.expander("Generated Short Story (English)"): | |
st.write(story) | |
if selected_language != 'en': | |
with st.expander(f"Generated Short Story ({selected_language.upper()})"): | |
st.write(translated_story) | |
# Play the generated audio (only in English) | |
st.audio("generated_audio.flac") | |
# Provide download links for the story and audio | |
st.download_button("Download Story (English)", story, file_name="generated_story.txt") | |
st.download_button("Download Audio (English)", "generated_audio.flac", file_name="generated_audio.flac") | |
elif uploaded_file is not None: | |
st.warning("Please enter both API tokens in the sidebar.") | |
if __name__ == "__main__": | |
load_dotenv(find_dotenv()) # still useful for fallback env vars | |
main() | |