Spaces:
Runtime error
Runtime error
| from gradio_client import Client | |
| import numpy as np | |
| import gradio as gr | |
| import requests | |
| import json | |
| import dotenv | |
| import soundfile as sf | |
| import time | |
| import textwrap | |
| from PIL import Image | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| import torch | |
| import os | |
| import uuid | |
| import optimum | |
| welcome_message = """ | |
| # 👋🏻Welcome to ⚕🗣️😷TruEra - MultiMed ⚕🗣️😷 | |
| 🗣️📝 This is an accessible and multimodal tool optimized using TruEra! We evaluated several configurations, prompts, and models to optimize this application. | |
| ### How To Use ⚕🗣️😷TruEra - MultiMed⚕: | |
| 🗣️📝Interact with ⚕🗣️😷TruEra - MultiMed⚕ in any language using image, audio or text. ⚕🗣️😷TruEra - MultiMed is an accessible application 📚🌟💼 that uses [Qwen/Qwen-1_8B-Chat](https://huggingface.co/Qwen/Qwen-1_8B-Chat) and [Tonic1/Official-Qwen-VL-Chat](https://huggingface.co/Qwen/Qwen-VL-Chat) with [Vectara](https://huggingface.co/vectara) embeddings + retrieval w/ [facebook/seamless-m4t-v2-large](https://huggingface.co/facebook/hf-seamless-m4t-large) for audio translation & accessibility. | |
| do [get in touch](https://discord.gg/GWpVpekp). You can also use 😷TruEra MultiMed⚕️ on your own data & in your own way by cloning this space. 🧬🔬🔍 Simply click here: <a style="display:inline-block" href="https://huggingface.co/spaces/TeamTonic/MultiMed?duplicate=true"><img src="https://img.shields.io/badge/-Duplicate%20Space-blue?labelColor=white&style=flat&logo=&logoWidth=14" alt="Duplicate Space"></a></h3> | |
| ### Join us : | |
| 🌟TeamTonic🌟 is always making cool demos! Join our active builder's🛠️community on 👻Discord: [Discord](https://discord.gg/GWpVpekp) On 🤗Huggingface: [TeamTonic](https://huggingface.co/TeamTonic) & [MultiTransformer](https://huggingface.co/MultiTransformer) On 🌐Github: [Polytonic](https://github.com/tonic-ai) & contribute to 🌟 [PolyGPT](https://github.com/tonic-ai/polygpt-alpha)" | |
| """ | |
| languages = { | |
| "English": "eng", | |
| "Modern Standard Arabic": "arb", | |
| "Bengali": "ben", | |
| "Catalan": "cat", | |
| "Czech": "ces", | |
| "Mandarin Chinese": "cmn", | |
| "Welsh": "cym", | |
| "Danish": "dan", | |
| "German": "deu", | |
| "Estonian": "est", | |
| "Finnish": "fin", | |
| "French": "fra", | |
| "Hindi": "hin", | |
| "Indonesian": "ind", | |
| "Italian": "ita", | |
| "Japanese": "jpn", | |
| "Korean": "kor", | |
| "Maltese": "mlt", | |
| "Dutch": "nld", | |
| "Western Persian": "pes", | |
| "Polish": "pol", | |
| "Portuguese": "por", | |
| "Romanian": "ron", | |
| "Russian": "rus", | |
| "Slovak": "slk", | |
| "Spanish": "spa", | |
| "Swedish": "swe", | |
| "Swahili": "swh", | |
| "Telugu": "tel", | |
| "Tagalog": "tgl", | |
| "Thai": "tha", | |
| "Turkish": "tur", | |
| "Ukrainian": "ukr", | |
| "Urdu": "urd", | |
| "Northern Uzbek": "uzn", | |
| "Vietnamese": "vie" | |
| } | |
| # Global variables to hold component references | |
| components = {} | |
| dotenv.load_dotenv() | |
| seamless_client = Client("https://facebook-seamless-m4t-v2-large.hf.space/--replicas/2bmbx/") #TruEra | |
| HuggingFace_Token = os.getenv("HuggingFace_Token") | |
| hf_token = os.getenv("HuggingFace_Token") | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| image_description = "" | |
| # audio_output = "" | |
| # global markdown_output | |
| # global audio_output | |
| def check_hallucination(assertion, citation): | |
| print("Entering check_hallucination function") | |
| api_url = "https://api-inference.huggingface.co/models/vectara/hallucination_evaluation_model" | |
| header = {"Authorization": f"Bearer {hf_token}"} | |
| payload = {"inputs": f"{assertion} [SEP] {citation}"} | |
| response = requests.post(api_url, headers=header, json=payload, timeout=120) | |
| output = response.json() | |
| output = output[0][0]["score"] | |
| print(f"check_hallucination output: {output}") | |
| return f"**hallucination score:** {output}" | |
| # Define the API parameters | |
| vapi_url = "https://api-inference.huggingface.co/models/vectara/hallucination_evaluation_model" | |
| headers = {"Authorization": f"Bearer {hf_token}"} | |
| # Function to query the API | |
| def query(payload): | |
| print("Entering query function") | |
| response = requests.post(vapi_url, headers=headers, json=payload) | |
| print(f"API response: {response.json()}") | |
| return response.json() | |
| # Function to evaluate hallucination | |
| def evaluate_hallucination(input1, input2): | |
| print("Entering evaluate_hallucination function") | |
| combined_input = f"{input1}[SEP]{input2}" | |
| output = query({"inputs": combined_input}) | |
| score = output[0][0]['score'] | |
| if score < 0.5: | |
| label = f"🔴 High risk. Score: {score:.2f}" | |
| else: | |
| label = f"🟢 Low risk. Score: {score:.2f}" | |
| print(f"evaluate_hallucination label: {label}") | |
| return label | |
| def save_audio(audio_input, output_dir="saved_audio"): | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| # Extract sample rate and audio data | |
| sample_rate, audio_data = audio_input | |
| # Generate a unique file name | |
| file_name = f"audio_{int(time.time())}.wav" | |
| file_path = os.path.join(output_dir, file_name) | |
| # Save the audio file | |
| sf.write(file_path, audio_data, sample_rate) | |
| return file_path | |
| def save_image(image_input, output_dir="saved_images"): | |
| print("Entering save_image function") | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| if isinstance(image_input, np.ndarray): | |
| image = Image.fromarray(image_input) | |
| file_name = f"image_{int(time.time())}.png" | |
| file_path = os.path.join(output_dir, file_name) | |
| image.save(file_path) | |
| print(f"Image saved at: {file_path}") | |
| return file_path | |
| else: | |
| raise ValueError("Invalid image input type") | |
| def process_image(image_file_path): | |
| print("Entering process_image function") | |
| client = Client("https://tonic1-official-qwen-vl-chat.hf.space/--replicas/4t5dh/") # TruEra | |
| try: | |
| result = client.predict( | |
| "Describe this image in detail, identify every detail in this image. Describe the image the best you can.", | |
| image_file_path, | |
| fn_index=0 | |
| ) | |
| print(f"Image processing result: {result}") | |
| return result | |
| except Exception as e: | |
| print(f"Error in process_image: {e}") | |
| return f"Error occurred during image processing: {e}" | |
| def process_speech(audio_input, source_language, target_language="English"): | |
| print("Entering process_speech function") | |
| if audio_input is None: | |
| return "No audio input provided." | |
| try: | |
| result = seamless_client.predict( | |
| audio_input, | |
| source_language, | |
| target_language, | |
| api_name="/s2tt" | |
| ) | |
| print(f"Speech processing result: {result}") | |
| return result | |
| except Exception as e: | |
| print(f"Error in process_speech: {str(e)}") | |
| return f"Error in speech processing: {str(e)}" | |
| def convert_text_to_speech(input_text, source_language, target_language): | |
| print("Entering convert_text_to_speech function") | |
| try: | |
| result = seamless_client.predict( | |
| input_text, | |
| source_language, | |
| target_language, | |
| api_name="/t2st" | |
| ) | |
| audio_file_path = result[0] if result else None | |
| translated_text = result[1] if result else "" | |
| print(f"Text-to-speech conversion result: Audio file path: {audio_file_path}, Translated text: {translated_text}") | |
| return audio_file_path, translated_text | |
| except Exception as e: | |
| print(f"Error in convert_text_to_speech: {str(e)}") | |
| return None, f"Error in text-to-speech conversion: {str(e)}" | |
| def query_vectara(text): | |
| user_message = text | |
| customer_id = os.getenv('CUSTOMER_ID') | |
| corpus_id = os.getenv('CORPUS_ID') | |
| api_key = os.getenv('API_KEY') | |
| # Define the headers | |
| api_key_header = { | |
| "customer-id": customer_id, | |
| "x-api-key": api_key | |
| } | |
| # Define the request body in the structure provided in the example | |
| request_body = { | |
| "query": [ | |
| { | |
| "query": user_message, | |
| "queryContext": "", | |
| "start": 1, | |
| "numResults": 25, | |
| "contextConfig": { | |
| "charsBefore": 0, | |
| "charsAfter": 0, | |
| "sentencesBefore": 2, | |
| "sentencesAfter": 2, | |
| "startTag": "%START_SNIPPET%", | |
| "endTag": "%END_SNIPPET%", | |
| }, | |
| "rerankingConfig": { | |
| "rerankerId": 272725718, | |
| "mmrConfig": { | |
| "diversityBias": 0.35 | |
| } | |
| }, | |
| "corpusKey": [ | |
| { | |
| "customerId": customer_id, | |
| "corpusId": corpus_id, | |
| "semantics": 0, | |
| "metadataFilter": "", | |
| "lexicalInterpolationConfig": { | |
| "lambda": 0 | |
| }, | |
| "dim": [] | |
| } | |
| ], | |
| "summary": [ | |
| { | |
| "maxSummarizedResults": 5, | |
| "responseLang": "auto", | |
| "summarizerPromptName": "vectara-summary-ext-v1.2.0" | |
| } | |
| ] | |
| } | |
| ] | |
| } | |
| # Make the API request using Gradio | |
| response = requests.post( | |
| "https://api.vectara.io/v1/query", | |
| json=request_body, # Use json to automatically serialize the request body | |
| verify=True, | |
| headers=api_key_header | |
| ) | |
| if response.status_code == 200: | |
| query_data = response.json() | |
| if query_data: | |
| sources_info = [] | |
| # Extract the summary. | |
| summary = query_data['responseSet'][0]['summary'][0]['text'] | |
| # Iterate over all response sets | |
| for response_set in query_data.get('responseSet', []): | |
| # Extract sources | |
| # Limit to top 5 sources. | |
| for source in response_set.get('response', [])[:5]: | |
| source_metadata = source.get('metadata', []) | |
| source_info = {} | |
| for metadata in source_metadata: | |
| metadata_name = metadata.get('name', '') | |
| metadata_value = metadata.get('value', '') | |
| if metadata_name == 'title': | |
| source_info['title'] = metadata_value | |
| elif metadata_name == 'author': | |
| source_info['author'] = metadata_value | |
| elif metadata_name == 'pageNumber': | |
| source_info['page number'] = metadata_value | |
| if source_info: | |
| sources_info.append(source_info) | |
| result = {"summary": summary, "sources": sources_info} | |
| return f"{json.dumps(result, indent=2)}" | |
| else: | |
| return "No data found in the response." | |
| else: | |
| return f"Error: {response.status_code}" | |
| def wrap_text(text, width=90): | |
| print("Wrapping text...") | |
| lines = text.split('\n') | |
| wrapped_lines = [textwrap.fill(line, width=width) for line in lines] | |
| wrapped_text = '\n'.join(wrapped_lines) | |
| return wrapped_text | |
| tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen-1_8B-Chat", trust_remote_code=True) #TruEra | |
| model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen-1_8B-Chat", device_map="auto", trust_remote_code=True).eval() | |
| class ChatBot: | |
| def __init__(self): | |
| self.history = None | |
| def predict(self, user_input, system_prompt=""): | |
| print("Generating prediction...") | |
| response, self.history = model.chat(tokenizer, user_input, history=self.history, system=system_prompt) | |
| return response | |
| bot = ChatBot() | |
| def multimodal_prompt(user_input, system_prompt="You are an expert medical analyst:"): | |
| print("Processing multimodal prompt...") | |
| return bot.predict(user_input, system_prompt) | |
| def process_summary_with_qwen(summary): | |
| print("Processing summary with Qwen...") | |
| system_prompt = "You are a medical instructor. Assess and describe the proper options to your students in minute detail. Propose a course of action for them to base their recommendations on based on your description." | |
| response_text = bot.predict(summary, system_prompt) | |
| return response_text | |
| def process_and_query(input_language=None, audio_input=None, image_input=None, text_input=None): | |
| try: | |
| print("Processing and querying...") | |
| combined_text = "" | |
| markdown_output = "" | |
| image_text = "" | |
| print(f"Image Input Type: {type(image_input)}, Audio Input Type: {type(audio_input)}") | |
| if image_input is not None: | |
| print("Processing image input...") | |
| image_file_path = save_image(image_input) | |
| image_text = process_image(image_file_path) | |
| combined_text += "\n\n**Image Input:**\n" + image_text | |
| elif audio_input is not None: | |
| print("Processing audio input...") | |
| sample_rate, audio_data = audio_input | |
| audio_file_path = save_audio(audio_input) | |
| audio_text = process_speech(audio_file_path, input_language, "English") | |
| combined_text += "\n\n**Audio Input:**\n" + audio_text | |
| elif text_input is not None and text_input.strip(): | |
| print("Processing text input...") | |
| combined_text += "The user asks the query above to his health adviser: " + text_input | |
| else: | |
| return "Error: Please provide some input (text, audio, or image)." | |
| if image_text: | |
| markdown_output += "\n### Original Image Description\n" | |
| markdown_output += image_text + "\n" | |
| print("Querying Vectara...") | |
| vectara_response_json = query_vectara(combined_text) | |
| vectara_response = json.loads(vectara_response_json) | |
| summary = vectara_response.get('summary', 'No summary available') | |
| sources_info = vectara_response.get('sources', []) | |
| markdown_output = "### Vectara Response Summary\n" | |
| markdown_output += f"* **Summary**: {summary}\n" | |
| markdown_output += "### Sources Information\n" | |
| for source in sources_info: | |
| markdown_output += f"* {source}\n" | |
| final_response = process_summary_with_qwen(summary) | |
| print("Converting text to speech...") | |
| target_language = "English" | |
| audio_output, translated_text = convert_text_to_speech(final_response, target_language, input_language) | |
| print("Evaluating hallucination...") | |
| try: | |
| hallucination_label = evaluate_hallucination(final_response, summary) | |
| except Exception as e: | |
| print(f"Error in hallucination evaluation: {e}") | |
| hallucination_label = "Evaluation skipped due to the model loading. For evaluation results, please try again in 29 minutes." | |
| markdown_output += "\n### Processed Summary with Qwen\n" | |
| markdown_output += final_response + "\n" | |
| markdown_output += "\n### Hallucination Evaluation\n" | |
| markdown_output += f"* **Label**: {hallucination_label}\n" | |
| markdown_output += "\n### Translated Text\n" | |
| markdown_output += translated_text + "\n" | |
| return markdown_output, audio_output | |
| except Exception as e: | |
| print(f"Error occurred: {e}") | |
| return f"Error occurred during processing: {e}.", None | |
| def clear(): | |
| return "English", None, None, "", None | |
| def create_interface(): | |
| with gr.Blocks(theme='ParityError/Anime') as interface: | |
| # Display the welcome message | |
| gr.Markdown(welcome_message) | |
| # Extract the full names of the languages | |
| language_names = list(languages.keys()) | |
| # Add a 'None' or similar option to represent no selection | |
| input_language_options = ["None"] + language_names | |
| # Create a dropdown for language selection | |
| input_language = gr.Dropdown(input_language_options, label="Select the language", value="English", interactive=True) | |
| with gr.Accordion("Use Voice", open=False) as voice_accordion: | |
| audio_input = gr.Audio(label="Speak") | |
| audio_output = gr.Markdown(label="Output text") # Markdown component for audio | |
| gr.Examples([["audio1.wav"], ["audio2.wav"], ], inputs=[audio_input]) | |
| with gr.Accordion("Use a Picture", open=False) as picture_accordion: | |
| image_input = gr.Image(label="Upload image") | |
| image_output = gr.Markdown(label="Output text") # Markdown component for image | |
| gr.Examples([["image1.png"], ["image2.jpeg"], ["image3.jpeg"], ], inputs=[image_input]) | |
| with gr.Accordion("MultiMed", open=False) as multimend_accordion: | |
| text_input = gr.Textbox(label="Use Text", lines=3, placeholder="I have had a sore throat and phlegm for a few days and now my cough has gotten worse!") | |
| gr.Examples([ | |
| ["What is the proper treatment for buccal herpes?"], | |
| ["I have had a sore throat and hoarse voice for several days and now a strong cough recently "], | |
| ["How does cellular metabolism work TCA cycle"], | |
| ["What special care must be provided to children with chicken pox?"], | |
| ["When and how often should I wash my hands?"], | |
| ["بکل ہرپس کا صحیح علاج کیا ہے؟"], | |
| ["구강 헤르페스의 적절한 치료법은 무엇입니까?"], | |
| ["Je, ni matibabu gani sahihi kwa herpes ya buccal?"], | |
| ], inputs=[text_input]) | |
| text_output = gr.Markdown(label="MultiMed") | |
| audio_output = gr.Audio(label="Audio Out", type="filepath") | |
| text_button = gr.Button("Use MultiMed") | |
| text_button.click(process_and_query, inputs=[input_language, audio_input, image_input, text_input], outputs=[text_output, audio_output]) | |
| clear_button = gr.Button("Clear") | |
| clear_button.click(clear, inputs=[], outputs=[input_language, audio_input, image_input, text_output, audio_output]) | |
| return interface | |
| app = create_interface() | |
| app.launch(show_error=True, debug=True) |